Skip to content
This repository was archived by the owner on Aug 16, 2020. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@ target/
nb-configuration.xml
*~
data/
deploy.sh
deploy.sh
.idea/
*.iml
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@

<groupId>com.pannous.es</groupId>
<artifactId>reindex</artifactId>
<version>0.90.0-SNAPSHOT</version>
<version>0.20.6</version>
<packaging>jar</packaging>

<name>Reindex Plugin</name>
<url>http://maven.apache.org</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<elasticsearch.version>0.90.0</elasticsearch.version>
<elasticsearch.version>0.20.6</elasticsearch.version>
</properties>

<dependencies>
Expand Down
58 changes: 35 additions & 23 deletions src/main/java/com/pannous/es/reindex/ExampleUsage.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package com.pannous.es.reindex;

import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Logger;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
Expand All @@ -15,6 +11,11 @@
import org.json.JSONException;
import org.json.JSONObject;

import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Logger;

/**
* Class to use the reindex plugin as rewrite/refeed plugin - directly from
* java.
Expand Down Expand Up @@ -60,23 +61,30 @@ public static void main(String[] args) {
Settings emptySettings = ImmutableSettings.settingsBuilder().build();
RestController contrl = new RestController(emptySettings);
ReIndexAction action = new ReIndexAction(emptySettings, client, contrl) {
@Override protected MySearchHits callback(MySearchHits hits) {
SimpleList res = new SimpleList(hitsPerPage, hits.totalHits());
for (MySearchHit h : hits.getHits()) {
try {
String str = new String(h.source(), charset);
RewriteSearchHit newHit = new RewriteSearchHit(h.id(), h.version(), str);
String someField = newHit.get("some_field");
if (someField.contains("some content")) {
newHit.put("some_field", "IT WORKS!");
@Override
protected HitsCallback callback(MySearchHits hits) {

return new HitsCallback() {
@Override
public MySearchHits transform(MySearchHits hits) {
SimpleList res = new SimpleList(hitsPerPage, hits.totalHits());
for (MySearchHit h : hits.getHits()) {
try {
String str = new String(h.source(), charset);
RewriteSearchHit newHit = new RewriteSearchHit(h.id(), h.version(), str);
String someField = newHit.get("some_field");
if (someField.contains("some content")) {
newHit.put("some_field", "IT WORKS!");
}

res.add(newHit);
} catch (UnsupportedEncodingException ex) {
throw new RuntimeException(ex);
}
}

res.add(newHit);
} catch (UnsupportedEncodingException ex) {
throw new RuntimeException(ex);
return res;
}
}
return res;
};
}
};
// first query, further scroll-queries in reindex!
Expand Down Expand Up @@ -105,7 +113,8 @@ public void add(MySearchHit hit) {
hits.add(hit);
}

@Override public Iterable<MySearchHit> getHits() {
@Override
public Iterable<MySearchHit> getHits() {
return hits;
}

Expand Down Expand Up @@ -152,15 +161,18 @@ public JSONObject put(String key, Object obj) {
}
}

@Override public String id() {
@Override
public String id() {
return id;
}

@Override public long version() {
@Override
public long version() {
return version;
}

@Override public byte[] source() {
@Override
public byte[] source() {
try {
return json.toString().getBytes(charset);
} catch (UnsupportedEncodingException ex) {
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/com/pannous/es/reindex/HitsCallback.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.pannous.es.reindex;

public class HitsCallback {
public MySearchHits transform(MySearchHits searchHits) {
return searchHits;
}
}
120 changes: 120 additions & 0 deletions src/main/java/com/pannous/es/reindex/Indexer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package com.pannous.es.reindex;

import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

public class Indexer implements Runnable {

private final ESLogger logger;
private Client client;
private MySearchResponse rsp;
private String newIndex;
private String newType;
private boolean withVersion;
private float waitSeconds;
private HitsCallback callback;


public Indexer(Client client, HitsCallback callback, MySearchResponse rsp, String newIndex, String newType, boolean withVersion, float waitSeconds) {
this.callback = callback;

logger = Loggers.getLogger(this.getClass());

this.client = client;
this.rsp = rsp;
this.newIndex = newIndex;
this.newType = newType;
this.withVersion = withVersion;
this.waitSeconds = waitSeconds;
}

@Override
public void run() {

boolean flushEnabled = false;
long total = rsp.hits().totalHits();
int collectedResults = 0;
int failed = 0;
while (true) {
if (collectedResults > 0 && waitSeconds > 0) {
try {
Thread.sleep(Math.round(waitSeconds * 1000));
} catch (InterruptedException ex) {
break;
}
}
StopWatch queryWatch = new StopWatch().start();
int currentResults = rsp.doScoll();
if (currentResults == 0)
break;

MySearchHits res = callback.transform(rsp.hits());
if (res == null)
break;
queryWatch.stop();
StopWatch updateWatch = new StopWatch().start();
failed += bulkUpdate(res, newIndex, newType, withVersion).size();
if (flushEnabled)
client.admin().indices().flush(new FlushRequest(newIndex)).actionGet();

updateWatch.stop();
collectedResults += currentResults;
logger.debug("Progress " + collectedResults + "/" + total
+ ". Time of update:" + updateWatch.totalTime().getSeconds() + " query:"
+ queryWatch.totalTime().getSeconds() + " failed:" + failed);
}
String str = "found " + total + ", collected:" + collectedResults
+ ", transfered:" + (float) rsp.bytes() / (1 << 20) + "MB";
if (failed > 0)
logger.warn(failed + " FAILED documents! " + str);
else
logger.info(str);
}

Collection<Integer> bulkUpdate(MySearchHits objects, String indexName,
String newType, boolean withVersion) {
BulkRequestBuilder brb = client.prepareBulk();
for (MySearchHit hit : objects.getHits()) {
if (hit.id() == null || hit.id().isEmpty()) {
logger.warn("Skipped object without id when bulkUpdate:" + hit);
continue;
}

try {
IndexRequest indexReq = Requests.indexRequest(indexName).type(newType).id(hit.id()).source(hit.source());
if (withVersion)
indexReq.version(hit.version());

brb.add(indexReq);
} catch (Exception ex) {
logger.warn("Cannot add object:" + hit + " to bulkIndexing action." + ex.getMessage());
}
}
if (brb.numberOfActions() > 0) {
BulkResponse rsp = brb.execute().actionGet();
if (rsp.hasFailures()) {
List<Integer> list = new ArrayList<Integer>(rsp.items().length);
for (BulkItemResponse br : rsp.items()) {
if (br.isFailed())
list.add(br.itemId());
}
return list;
}
}
return Collections.emptyList();
}

}
Loading