From 7c5765b79aade837a0ba71a305186ef703571d42 Mon Sep 17 00:00:00 2001 From: Srivatsa Katta Date: Fri, 21 Jun 2013 13:33:21 +0530 Subject: [PATCH 1/3] Compatible with elasticsearch v0.20.6 --- .gitignore | 4 +++- pom.xml | 4 ++-- src/main/java/com/pannous/es/reindex/ReIndexAction.java | 6 +++--- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/.gitignore b/.gitignore index b2509ac..a2d1010 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,6 @@ target/ nb-configuration.xml *~ data/ -deploy.sh \ No newline at end of file +deploy.sh +.idea/ +*.iml diff --git a/pom.xml b/pom.xml index 9f9a2d4..701c74f 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ com.pannous.es reindex - 0.90.0-SNAPSHOT + 0.20.6 jar Reindex Plugin @@ -12,7 +12,7 @@ UTF-8 - 0.90.0 + 0.20.6 diff --git a/src/main/java/com/pannous/es/reindex/ReIndexAction.java b/src/main/java/com/pannous/es/reindex/ReIndexAction.java index b5abcc3..1467bdb 100644 --- a/src/main/java/com/pannous/es/reindex/ReIndexAction.java +++ b/src/main/java/com/pannous/es/reindex/ReIndexAction.java @@ -189,10 +189,10 @@ Collection bulkUpdate(MySearchHits objects, String indexName, if (brb.numberOfActions() > 0) { BulkResponse rsp = brb.execute().actionGet(); if (rsp.hasFailures()) { - List list = new ArrayList(rsp.getItems().length); - for (BulkItemResponse br : rsp.getItems()) { + List list = new ArrayList(rsp.items().length); + for (BulkItemResponse br : rsp.items()) { if (br.isFailed()) - list.add(br.getItemId()); + list.add(br.itemId()); } return list; } From db0dc3142568707052ed5e041d6e0a29b31f45af Mon Sep 17 00:00:00 2001 From: Srivatsa Katta Date: Mon, 24 Jun 2013 15:00:31 +0530 Subject: [PATCH 2/3] Reindexing documents in background --- .../java/com/pannous/es/reindex/Indexer.java | 118 ++++++++++++++++++ .../com/pannous/es/reindex/ReIndexAction.java | 115 ++++------------- .../es/reindex/ReIndexActionTester.java | 12 +- 3 files changed, 150 insertions(+), 95 deletions(-) create mode 100644 src/main/java/com/pannous/es/reindex/Indexer.java diff --git a/src/main/java/com/pannous/es/reindex/Indexer.java b/src/main/java/com/pannous/es/reindex/Indexer.java new file mode 100644 index 0000000..361fb29 --- /dev/null +++ b/src/main/java/com/pannous/es/reindex/Indexer.java @@ -0,0 +1,118 @@ +package com.pannous.es.reindex; + +import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.Requests; +import org.elasticsearch.common.StopWatch; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +public class Indexer implements Runnable { + + private final ESLogger logger; + private Client client; + private MySearchResponse rsp; + private String newIndex; + private String newType; + private boolean withVersion; + private float waitSeconds; + + + public Indexer(Client client, MySearchResponse rsp, String newIndex, String newType, boolean withVersion, float waitSeconds) { + + logger = Loggers.getLogger(this.getClass()); + + this.client = client; + this.rsp = rsp; + this.newIndex = newIndex; + this.newType = newType; + this.withVersion = withVersion; + this.waitSeconds = waitSeconds; + } + + @Override + public void run() { + + boolean flushEnabled = false; + long total = rsp.hits().totalHits(); + int collectedResults = 0; + int failed = 0; + while (true) { + if (collectedResults > 0 && waitSeconds > 0) { + try { + Thread.sleep(Math.round(waitSeconds * 1000)); + } catch (InterruptedException ex) { + break; + } + } + StopWatch queryWatch = new StopWatch().start(); + int currentResults = rsp.doScoll(); + if (currentResults == 0) + break; + + MySearchHits res = rsp.hits(); + if (res == null) + break; + queryWatch.stop(); + StopWatch updateWatch = new StopWatch().start(); + failed += bulkUpdate(res, newIndex, newType, withVersion).size(); + if (flushEnabled) + client.admin().indices().flush(new FlushRequest(newIndex)).actionGet(); + + updateWatch.stop(); + collectedResults += currentResults; + logger.debug("Progress " + collectedResults + "/" + total + + ". Time of update:" + updateWatch.totalTime().getSeconds() + " query:" + + queryWatch.totalTime().getSeconds() + " failed:" + failed); + } + String str = "found " + total + ", collected:" + collectedResults + + ", transfered:" + (float) rsp.bytes() / (1 << 20) + "MB"; + if (failed > 0) + logger.warn(failed + " FAILED documents! " + str); + else + logger.info(str); + } + + Collection bulkUpdate(MySearchHits objects, String indexName, + String newType, boolean withVersion) { + BulkRequestBuilder brb = client.prepareBulk(); + for (MySearchHit hit : objects.getHits()) { + if (hit.id() == null || hit.id().isEmpty()) { + logger.warn("Skipped object without id when bulkUpdate:" + hit); + continue; + } + + try { + IndexRequest indexReq = Requests.indexRequest(indexName).type(newType).id(hit.id()).source(hit.source()); + if (withVersion) + indexReq.version(hit.version()); + + brb.add(indexReq); + } catch (Exception ex) { + logger.warn("Cannot add object:" + hit + " to bulkIndexing action." + ex.getMessage()); + } + } + if (brb.numberOfActions() > 0) { + BulkResponse rsp = brb.execute().actionGet(); + if (rsp.hasFailures()) { + List list = new ArrayList(rsp.items().length); + for (BulkItemResponse br : rsp.items()) { + if (br.isFailed()) + list.add(br.itemId()); + } + return list; + } + } + return Collections.emptyList(); + } + +} diff --git a/src/main/java/com/pannous/es/reindex/ReIndexAction.java b/src/main/java/com/pannous/es/reindex/ReIndexAction.java index 1467bdb..5c55747 100644 --- a/src/main/java/com/pannous/es/reindex/ReIndexAction.java +++ b/src/main/java/com/pannous/es/reindex/ReIndexAction.java @@ -1,11 +1,5 @@ package com.pannous.es.reindex; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; @@ -15,20 +9,23 @@ import org.elasticsearch.action.search.SearchType; import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; -import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.rest.BaseRestHandler; -import org.elasticsearch.rest.RestChannel; -import org.elasticsearch.rest.RestController; -import org.elasticsearch.rest.RestRequest; -import org.elasticsearch.rest.XContentRestResponse; -import org.elasticsearch.rest.XContentThrowableRestResponse; -import static org.elasticsearch.rest.RestRequest.Method.*; -import static org.elasticsearch.rest.RestStatus.*; -import static org.elasticsearch.rest.action.support.RestXContentBuilder.*; +import org.elasticsearch.rest.*; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import static org.elasticsearch.rest.RestRequest.Method.POST; +import static org.elasticsearch.rest.RestRequest.Method.PUT; +import static org.elasticsearch.rest.RestStatus.OK; +import static org.elasticsearch.rest.action.support.RestXContentBuilder.restContentBuilder; /** * Refeeds all the documents which matches the type and the (optional) query. @@ -37,7 +34,8 @@ */ public class ReIndexAction extends BaseRestHandler { - @Inject public ReIndexAction(Settings settings, Client client, RestController controller) { + @Inject + public ReIndexAction(Settings settings, Client client, RestController controller) { super(settings, client); if (controller != null) { @@ -47,7 +45,8 @@ public class ReIndexAction extends BaseRestHandler { } } - @Override public void handleRequest(RestRequest request, RestChannel channel) { + @Override + public void handleRequest(RestRequest request, RestChannel channel) { handleRequest(request, channel, null, false); } @@ -111,7 +110,7 @@ public void handleRequest(RestRequest request, RestChannel channel, String newTy } public SearchRequestBuilder createScrollSearch(String oldIndexName, String oldType, String filter, - int hitsPerPage, boolean withVersion, int keepTimeInMinutes) { + int hitsPerPage, boolean withVersion, int keepTimeInMinutes) { SearchRequestBuilder srb = client.prepareSearch(oldIndexName). setTypes(oldType). setVersion(withVersion). @@ -124,80 +123,14 @@ public SearchRequestBuilder createScrollSearch(String oldIndexName, String oldTy return srb; } - public int reindex(MySearchResponse rsp, String newIndex, String newType, boolean withVersion, - float waitSeconds) { - boolean flushEnabled = false; - long total = rsp.hits().totalHits(); - int collectedResults = 0; - int failed = 0; - while (true) { - if (collectedResults > 0 && waitSeconds > 0) { - try { - Thread.sleep(Math.round(waitSeconds * 1000)); - } catch (InterruptedException ex) { - break; - } - } - StopWatch queryWatch = new StopWatch().start(); - int currentResults = rsp.doScoll(); - if (currentResults == 0) - break; - - MySearchHits res = callback(rsp.hits()); - if (res == null) - break; - queryWatch.stop(); - StopWatch updateWatch = new StopWatch().start(); - failed += bulkUpdate(res, newIndex, newType, withVersion).size(); - if (flushEnabled) - client.admin().indices().flush(new FlushRequest(newIndex)).actionGet(); - - updateWatch.stop(); - collectedResults += currentResults; - logger.debug("Progress " + collectedResults + "/" + total - + ". Time of update:" + updateWatch.totalTime().getSeconds() + " query:" - + queryWatch.totalTime().getSeconds() + " failed:" + failed); - } - String str = "found " + total + ", collected:" + collectedResults - + ", transfered:" + (float) rsp.bytes() / (1 << 20) + "MB"; - if (failed > 0) - logger.warn(failed + " FAILED documents! " + str); - else - logger.info(str); - return collectedResults; - } - - Collection bulkUpdate(MySearchHits objects, String indexName, - String newType, boolean withVersion) { - BulkRequestBuilder brb = client.prepareBulk(); - for (MySearchHit hit : objects.getHits()) { - if (hit.id() == null || hit.id().isEmpty()) { - logger.warn("Skipped object without id when bulkUpdate:" + hit); - continue; - } + public Thread reindex(MySearchResponse rsp, String newIndex, String newType, boolean withVersion, + float waitSeconds) { - try { - IndexRequest indexReq = Requests.indexRequest(indexName).type(newType).id(hit.id()).source(hit.source()); - if (withVersion) - indexReq.version(hit.version()); + Indexer indexer = new Indexer(client, rsp, newIndex, newType, withVersion, waitSeconds); + Thread indexerThread = EsExecutors.daemonThreadFactory(settings, this.getClass().getCanonicalName()).newThread(indexer); + indexerThread.start(); - brb.add(indexReq); - } catch (Exception ex) { - logger.warn("Cannot add object:" + hit + " to bulkIndexing action." + ex.getMessage()); - } - } - if (brb.numberOfActions() > 0) { - BulkResponse rsp = brb.execute().actionGet(); - if (rsp.hasFailures()) { - List list = new ArrayList(rsp.items().length); - for (BulkItemResponse br : rsp.items()) { - if (br.isFailed()) - list.add(br.itemId()); - } - return list; - } - } - return Collections.emptyList(); + return indexerThread; } protected MySearchHits callback(MySearchHits hits) { diff --git a/src/test/java/com/pannous/es/reindex/ReIndexActionTester.java b/src/test/java/com/pannous/es/reindex/ReIndexActionTester.java index 0961347..2de6677 100644 --- a/src/test/java/com/pannous/es/reindex/ReIndexActionTester.java +++ b/src/test/java/com/pannous/es/reindex/ReIndexActionTester.java @@ -58,8 +58,10 @@ protected abstract MySearchResponse scrollSearch(String searchIndex, String sear refresh("oldtweets"); assertThat(count("oldtweets"), equalTo(2L)); - int res = action.reindex(scrollSearch("oldtweets", "tweet", ""), "tweets", "tweet", false, 0); - assertThat(res, equalTo(2)); + Thread reindex = action.reindex(scrollSearch("oldtweets", "tweet", ""), "tweets", "tweet", false, 0); + while(reindex.getState() != Thread.State.TERMINATED) { + + } refresh("tweets"); assertThat(count("tweets"), equalTo(2L)); @@ -76,8 +78,10 @@ protected abstract MySearchResponse scrollSearch(String searchIndex, String sear add("oldtweets", "tweet", "{ \"name\" : \"peter test\", \"count\" : 2}"); refresh("oldtweets"); assertThat(count("oldtweets"), equalTo(2L)); - int res = action.reindex(scrollSearch("oldtweets", "tweet", "{ \"term\": { \"count\" : 2} }"), "tweets", "tweet", false, 0); - assertThat(res, equalTo(1)); + Thread reindex = action.reindex(scrollSearch("oldtweets", "tweet", "{ \"term\": { \"count\" : 2} }"), "tweets", "tweet", false, 0); + while(reindex.getState() != Thread.State.TERMINATED) { + + } refresh("tweets"); assertThat(count("tweets"), equalTo(1L)); SearchResponse sr = client.prepareSearch("tweets").execute().actionGet(); From dfbac2c2574e0301839758d0dcf735d8152d7556 Mon Sep 17 00:00:00 2001 From: Srivatsa Katta Date: Mon, 24 Jun 2013 15:39:00 +0530 Subject: [PATCH 3/3] Fixed callback for hits while reindexing --- .../com/pannous/es/reindex/ExampleUsage.java | 58 +++++++++++-------- .../com/pannous/es/reindex/HitsCallback.java | 7 +++ .../java/com/pannous/es/reindex/Indexer.java | 6 +- .../com/pannous/es/reindex/ReIndexAction.java | 15 +---- 4 files changed, 49 insertions(+), 37 deletions(-) create mode 100644 src/main/java/com/pannous/es/reindex/HitsCallback.java diff --git a/src/main/java/com/pannous/es/reindex/ExampleUsage.java b/src/main/java/com/pannous/es/reindex/ExampleUsage.java index 7f604f8..5865967 100644 --- a/src/main/java/com/pannous/es/reindex/ExampleUsage.java +++ b/src/main/java/com/pannous/es/reindex/ExampleUsage.java @@ -1,9 +1,5 @@ package com.pannous.es.reindex; -import java.io.UnsupportedEncodingException; -import java.util.ArrayList; -import java.util.List; -import java.util.logging.Logger; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; @@ -15,6 +11,11 @@ import org.json.JSONException; import org.json.JSONObject; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.List; +import java.util.logging.Logger; + /** * Class to use the reindex plugin as rewrite/refeed plugin - directly from * java. @@ -60,23 +61,30 @@ public static void main(String[] args) { Settings emptySettings = ImmutableSettings.settingsBuilder().build(); RestController contrl = new RestController(emptySettings); ReIndexAction action = new ReIndexAction(emptySettings, client, contrl) { - @Override protected MySearchHits callback(MySearchHits hits) { - SimpleList res = new SimpleList(hitsPerPage, hits.totalHits()); - for (MySearchHit h : hits.getHits()) { - try { - String str = new String(h.source(), charset); - RewriteSearchHit newHit = new RewriteSearchHit(h.id(), h.version(), str); - String someField = newHit.get("some_field"); - if (someField.contains("some content")) { - newHit.put("some_field", "IT WORKS!"); + @Override + protected HitsCallback callback(MySearchHits hits) { + + return new HitsCallback() { + @Override + public MySearchHits transform(MySearchHits hits) { + SimpleList res = new SimpleList(hitsPerPage, hits.totalHits()); + for (MySearchHit h : hits.getHits()) { + try { + String str = new String(h.source(), charset); + RewriteSearchHit newHit = new RewriteSearchHit(h.id(), h.version(), str); + String someField = newHit.get("some_field"); + if (someField.contains("some content")) { + newHit.put("some_field", "IT WORKS!"); + } + + res.add(newHit); + } catch (UnsupportedEncodingException ex) { + throw new RuntimeException(ex); + } } - - res.add(newHit); - } catch (UnsupportedEncodingException ex) { - throw new RuntimeException(ex); + return res; } - } - return res; + }; } }; // first query, further scroll-queries in reindex! @@ -105,7 +113,8 @@ public void add(MySearchHit hit) { hits.add(hit); } - @Override public Iterable getHits() { + @Override + public Iterable getHits() { return hits; } @@ -152,15 +161,18 @@ public JSONObject put(String key, Object obj) { } } - @Override public String id() { + @Override + public String id() { return id; } - @Override public long version() { + @Override + public long version() { return version; } - @Override public byte[] source() { + @Override + public byte[] source() { try { return json.toString().getBytes(charset); } catch (UnsupportedEncodingException ex) { diff --git a/src/main/java/com/pannous/es/reindex/HitsCallback.java b/src/main/java/com/pannous/es/reindex/HitsCallback.java new file mode 100644 index 0000000..253087b --- /dev/null +++ b/src/main/java/com/pannous/es/reindex/HitsCallback.java @@ -0,0 +1,7 @@ +package com.pannous.es.reindex; + +public class HitsCallback { + public MySearchHits transform(MySearchHits searchHits) { + return searchHits; + } +} diff --git a/src/main/java/com/pannous/es/reindex/Indexer.java b/src/main/java/com/pannous/es/reindex/Indexer.java index 361fb29..f5cb6f4 100644 --- a/src/main/java/com/pannous/es/reindex/Indexer.java +++ b/src/main/java/com/pannous/es/reindex/Indexer.java @@ -25,9 +25,11 @@ public class Indexer implements Runnable { private String newType; private boolean withVersion; private float waitSeconds; + private HitsCallback callback; - public Indexer(Client client, MySearchResponse rsp, String newIndex, String newType, boolean withVersion, float waitSeconds) { + public Indexer(Client client, HitsCallback callback, MySearchResponse rsp, String newIndex, String newType, boolean withVersion, float waitSeconds) { + this.callback = callback; logger = Loggers.getLogger(this.getClass()); @@ -59,7 +61,7 @@ public void run() { if (currentResults == 0) break; - MySearchHits res = rsp.hits(); + MySearchHits res = callback.transform(rsp.hits()); if (res == null) break; queryWatch.stop(); diff --git a/src/main/java/com/pannous/es/reindex/ReIndexAction.java b/src/main/java/com/pannous/es/reindex/ReIndexAction.java index 5c55747..2538add 100644 --- a/src/main/java/com/pannous/es/reindex/ReIndexAction.java +++ b/src/main/java/com/pannous/es/reindex/ReIndexAction.java @@ -1,14 +1,9 @@ package com.pannous.es.reindex; -import org.elasticsearch.action.bulk.BulkItemResponse; -import org.elasticsearch.action.bulk.BulkRequestBuilder; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.client.Client; -import org.elasticsearch.client.Requests; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -17,10 +12,6 @@ import org.elasticsearch.rest.*; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; import static org.elasticsearch.rest.RestRequest.Method.POST; import static org.elasticsearch.rest.RestRequest.Method.PUT; @@ -126,14 +117,14 @@ public SearchRequestBuilder createScrollSearch(String oldIndexName, String oldTy public Thread reindex(MySearchResponse rsp, String newIndex, String newType, boolean withVersion, float waitSeconds) { - Indexer indexer = new Indexer(client, rsp, newIndex, newType, withVersion, waitSeconds); + Indexer indexer = new Indexer(client, callback(rsp.hits()), rsp, newIndex, newType, withVersion, waitSeconds); Thread indexerThread = EsExecutors.daemonThreadFactory(settings, this.getClass().getCanonicalName()).newThread(indexer); indexerThread.start(); return indexerThread; } - protected MySearchHits callback(MySearchHits hits) { - return hits; + protected HitsCallback callback(MySearchHits hits) { + return new HitsCallback(); } }