添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

生产者使用ES 滚动API遍历全部数据,
消费者获取队列中_id 和 调用新ES的API 做MD5对比
代码还有点小缺陷无伤大雅,思路可以借鉴,数据量大可以改造成多生产多消费者模型 自测 10分钟 107518 可以扫描完毕

POM依赖

<dependencies>
  <dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.70</version></dependency>
  <dependency><groupId>commons-logging</groupId><artifactId>commons-logging</artifactId><version>1.2</version></dependency>
  <dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId><version>6.8.2</version></dependency>
  <dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>6.8.2</version></dependency>
  <dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.9.9</version></dependency>
  <dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency>
</dependencies>

ItemDataContrast.java

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.action.search.*;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.io.IOException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class ItemDataContrast {
    private final static String LEFT_IP ="迁移的ES机器IP";
    private final static int LEFT_PORT =新机器端口;
    private final static String LEFT_USER ="迁移的ES机器账号";
    private final static String LEFT_PWD ="迁移的ES机器密码";
    private static final String INDEX_NAME = "对比的索引";
    private final static String RIGHT_IP ="新机器IP";
    private final static int RIGHT_PORT =新机器端口;
    private final static String RIGHT_USER ="新机器账号";
    private final static String RIGHT_PWD ="新机器密码";
 	private static final int OFFSET =10000;
    private static  int leftTotalHits =0;
    private  static int countItem = 0;
    private static LinkedHashMap<String,Object> map = new LinkedHashMap<>();
    private static RestHighLevelClient leftClient;
    private static RestHighLevelClient rightclient;
    private static BlockingQueue<SearchHit> queue = new LinkedBlockingQueue<>();
    private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy年MM月dd日 HH时mm分ss秒");
    public static void main(String[] args) {
        HttpHost httpHost = new HttpHost(LEFT_IP, LEFT_PORT);
        Header[] defaultHeaders = {new BasicHeader("charset", "utf-8"),new BasicHeader("content-type", "application/json")};
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY,new UsernamePasswordCredentials(LEFT_USER, LEFT_PWD));
        RestClientBuilder builder = RestClient.builder(httpHost).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
            @Override
            public HttpAsyncClientBuilder customizeHttpClient(
                    HttpAsyncClientBuilder httpClientBuilder) {
                return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
        builder.setDefaultHeaders(defaultHeaders);
        leftClient = new RestHighLevelClient(builder);
        // 创建生产者和消费者
        Producer producer = new Producer();
        Consumer consumer = new Consumer();
        FileUtil.writeFile("logs","ItemData.log",Thread.currentThread().getName()+" _"+Thread.currentThread().getId()+"_开始核对");
        // 启动生产者和消费者线程
        Thread producerThread = new Thread(producer);
        producerThread.start();
        for (int i = 0;i<10;i++) {
            Thread consumerThread = new Thread(consumer);
            consumerThread.start();
    static class Producer implements Runnable {
        private  static int count = 1;
        @Override
        public void run() {
            try {
                SearchRequest searchRequest = new SearchRequest(INDEX_NAME);
                searchRequest.scroll(TimeValue.timeValueMinutes(1L));
                SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
                searchSourceBuilder.query(QueryBuilders.matchAllQuery());
                searchSourceBuilder.size(OFFSET);
                searchRequest.source(searchSourceBuilder);
                SearchResponse searchResponse = leftClient.search(searchRequest, RequestOptions.DEFAULT);
                leftTotalHits =  Integer.parseInt(String.valueOf(searchResponse.getHits().totalHits));
                FileUtil.writeFile("logs","ItemData.log",Thread.currentThread().getName()+"_"+LEFT_IP+": 总数:"+leftTotalHits);
                String scrollId = searchResponse.getScrollId();
                SearchHit[] searchHits = searchResponse.getHits().getHits();
                FileUtil.writeFile("logs","ItemData.log",Thread.currentThread().getName()+" _"+Thread.currentThread().getId()+"_开始第"+count+"次遍历 生产者数据:"+OFFSET + "获取到的数据:"+searchHits.length);
                while (searchHits != null && searchHits.length > 0) {
                    count++;
                    for (SearchHit hit : searchHits) {
                        countItem++;
                        // 处理单个文档
                        queue.put(hit);
                    Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1L));
                    SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
                    scrollRequest.scroll(scroll);
                    searchResponse = leftClient.scroll(scrollRequest, RequestOptions.DEFAULT);
                    scrollId = searchResponse.getScrollId();
                    searchHits = searchResponse.getHits().getHits();
                    FileUtil.writeFile("logs","ItemData.log",Thread.currentThread().getName()+" _"+Thread.currentThread().getId()+"_开始第"+count+"次遍历 生产者数据:"+OFFSET + "获取到的数据:"+searchHits.length);
                // 清除滚动
                ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
                clearScrollRequest.addScrollId(scrollId);
                ClearScrollResponse clearScrollResponse = leftClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
                System.out.println("生产者结束");
                FileUtil.writeFile("logs","ItemData.log",Thread.currentThread().getName()+" _"+Thread.currentThread().getId()+"_生产者任务完成");
            } catch (Exception e) {
                FileUtil.writeFile("logs","ItemData.log",JSONObject.toJSONString(e));
                e.printStackTrace();
                Thread.currentThread().interrupt();
    static class Consumer implements Runnable {
        static {
            HttpHost httpHost = new HttpHost(RIGHT_IP, RIGHT_PORT);
            Header[] defaultHeaders = {new BasicHeader("charset", "utf-8"),new BasicHeader("content-type", "application/json")};
            final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(AuthScope.ANY,new UsernamePasswordCredentials(RIGHT_USER, RIGHT_PWD));
            RestClientBuilder builder = RestClient.builder(httpHost).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                @Override
                public HttpAsyncClientBuilder customizeHttpClient(
                        HttpAsyncClientBuilder httpClientBuilder) {
                    return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
            builder.setDefaultHeaders(defaultHeaders);
            rightclient = new RestHighLevelClient(builder);
        @Override
        public void run() {
            try {
                while (true) {
                    SearchHit hit = queue.take();
                    Map<String, Object> sourceAsMap = hit.getSourceAsMap();
                    SearchHit[] rightSearchHits = getData(rightclient,INDEX_NAME,"_id", hit.getId());
                    JSONArray jsonArray;
                    if(null==map.get("diff_ids")){
                        jsonArray = new JSONArray();
                    }else {
                        jsonArray = (JSONArray) map.get("diff_ids");
                    if(rightSearchHits==null || rightSearchHits.length==0){
                        if(null==map.get("diff_ids")){
                            jsonArray = new JSONArray();
                        }else {
                            jsonArray = (JSONArray) map.get("diff_ids");
                        JSONObject object =  new JSONObject();
                        object.put("id",hit.getId());
                        object.put("msg",RIGHT_IP+"无数据");
                        jsonArray.add(object);
                        map.put("diff_ids",jsonArray);
                        FileUtil.writeFile("logs","ItemDataError.log",Thread.currentThread().getName()+" _"+Thread.currentThread().getId()+"_核对数据错误:"+object.toJSONString());
                        //右边无数据直接跳过
                        continue;
                    String left = JSONObject.toJSONString(sourceAsMap);
                    String leftMd5 = md5(left);
                    boolean flag = false;
                    String right = "";
                    for (SearchHit searchHitR:rightSearchHits) {
                        right = JSONObject.toJSONString(searchHitR.getSourceAsMap());
                        String rightMd5 = md5(right);
                        if(leftMd5.equals(rightMd5)){
                            flag = true;
                            break;
                    if(!flag){
                        JSONObject diff = new JSONObject();
                        diff.put("id",hit.getId());
                        diff.put("msg",LEFT_IP+"_"+RIGHT_IP+"_MD5差异");
                        diff.put("left",left);
                        diff.put("right",right);
                        jsonArray.add(diff);
                        map.put("diff_ids",jsonArray);
                        FileUtil.writeFile("logs","ItemDataError.log",Thread.currentThread().getName()+" _"+Thread.currentThread().getId()+"_MD5差异:"+diff.toJSONString());
                    Map<String,Object> notDiffMap;
                    if(null==map.get("notDiff")){
                        notDiffMap = new HashMap<>();
                    }else {
                        notDiffMap = (Map<String, Object>) map.get("notDiff");
                    JSONObject notDiffOBJ =  new JSONObject();
                    notDiffOBJ.put("id",hit.getId());
                    notDiffOBJ.put("type",hit.getType());
                    notDiffMap.put(hit.getId()+"_"+hit.getType(),notDiffOBJ);
                    FileUtil.writeFile("logs","ItemDataSuccess.log",Thread.currentThread().getName()+" _"+Thread.currentThread().getId()+"核对数据正确:"+notDiffOBJ.toJSONString());
                    map.put("notDiff",notDiffMap);
                    System.out.println(queue.size() + "___"+countItem+"___"+leftTotalHits);
                    if (queue.size()==0 ) {
                        FileUtil.writeFile("logs","ItemData.log",Thread.currentThread().getName()+" _"+Thread.currentThread().getId()+"_正确数据:"+notDiffMap.size());
                        FileUtil.writeFile("logs","ItemData.log",Thread.currentThread().getName()+" _"+Thread.currentThread().getId()+"_差异如下:"+jsonArray.size());
                        FileUtil.writeFile("logs","ItemData.log",Thread.currentThread().getName()+" _"+Thread.currentThread().getId()+"_数据完毕: 扫描行数"+countItem);
                        FileUtil.writeFile("logs","ItemData.log",Thread.currentThread().getName()+" _"+Thread.currentThread().getId()+"_数据完毕: 总行数"+leftTotalHits);
                        if(countItem==leftTotalHits){
                            FileUtil.writeFile("logs","ItemData.log",Thread.currentThread().getName()+" _"+Thread.currentThread().getId()+"_消费者完成对比");
                            break;
                System.out.println("消费者结束");
            } catch (Exception e) {
                e.printStackTrace();
                FileUtil.writeFile("logs","ItemData.log",JSONObject.toJSONString(e));
                Thread.currentThread().interrupt();
        public static String md5(String message) {
            try {
                // 创建MD5消息摘要对象
                MessageDigest md = MessageDigest.getInstance("MD5");
                // 计算消息的摘要
                byte[] digest = md.digest(message.getBytes());
                // 将摘要转换为十六进制字符串
                String hexString = bytesToHex(digest);
                return hexString;
            } catch (NoSuchAlgorithmException e) {
                e.printStackTrace();
                FileUtil.writeFile("logs","ItemData.log",JSONObject.toJSONString(e));
            return null;
        public static String bytesToHex(byte[] bytes) {
            StringBuilder hexString = new StringBuilder();
            for (byte b : bytes) {
                String hex = Integer.toHexString(0xff & b);
                if (hex.length() == 1) {
                    hexString.append('0');
                hexString.append(hex);
            return hexString.toString();
        public static SearchHit[] getData(RestHighLevelClient client,String indexName,String key,String value)  {
            //1 创建搜索文档请求
            SearchRequest searchRequest=new SearchRequest(indexName);  //请求索引
            SearchSourceBuilder builder = new SearchSourceBuilder();
            builder.query(QueryBuilders.matchQuery(key,value));
            searchRequest.source(builder);
            SearchHit[] hits=null;
                // 2 执行检索
                SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
                // 3 分析响应结果
                //遍历数据
                hits = response.getHits().getHits();
                return  hits;
            } catch (Exception e) {
                e.printStackTrace();
                FileUtil.writeFile("logs","ItemData.log",JSONObject.toJSONString(e));
            return  hits;

FileUtil.java

import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; public class FileUtil { //创建文件夹 public static boolean mkdir(String directory) { boolean flag = false; if (null==directory || directory.trim().length()==0) { directory = "D:" + File.separator + "usr" + File.separator + "log"; File file = new File(directory); if (!file.exists()) { flag = file.mkdirs(); } else { flag = true; return flag; //指定目录下创建文件 public static void writeFile(String directory, String fileName, String context) { File file = new File(directory, fileName); boolean isSuccess = false; if (!file.exists()) { try { isSuccess = file.createNewFile(); } catch (IOException e) { boolean mkdir = mkdir(directory); if (mkdir) { writeFile(directory, fileName, context); if (isSuccess) { write(directory, fileName, context); } else { write(directory, fileName, context); //换行追加写文件 public static void write(String directory, String fileName, String context) { try { FileWriter fileWriter = new FileWriter(directory + File.separator + fileName, true); BufferedWriter bufferedWriter = new BufferedWriter(fileWriter); SimpleDateFormat sd = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss sss"); //格式化 Date nowtime = new Date(); String strTime =sd.format(nowtime); bufferedWriter.write(strTime+":"+context + "\r\n"); bufferedWriter.flush(); bufferedWriter.close(); fileWriter.close(); } catch (IOException e) { e.printStackTrace();