-
Notifications
You must be signed in to change notification settings - Fork 11
complexexamples
Much more complex scenarios are also possible. For example, consider the scenario where you might want to filter documents where the record is of type ‘journal’, the stage is ‘S300’, the publication year is > 2010 and < 2014, the abstract contains ‘heart’ or ‘brain’ or ‘body’ or ‘number’ and the section contains ‘red’ or ‘black’. While the following example is not a robust full-text search (no stemming, very simple tokenization, etc.) it highlights the power of what can be accomplished with a fairly simple XPath expression.
import com.elsevier.spark_xml_utils.xpath.XPathProcessor
import scala.collection.JavaConverters._
import java.util.HashMap
val xmlKeyPair = sc.sequenceFile[String, String]("s3n://spark-xml-utils/xml/part*")
val filtered = xmlKeyPair.mapPartitions(recsIter => {
val xpath ="/xocs:doc[./xocs:meta[xocs:content-type='JL' " +
"and xocs:item-stage='S300' " +
"and xocs:cover-date-year > 2010 " +
"and xocs:cover-date-year < 2014] " +
"and .//ja:head[.//ce:abstract[tokenize(lower-case(string-join(.//text(),' ')),'\\W+') = ('heart','brain','body','number')]] " +
"and .//ce:section[tokenize(lower-case(string-join(.//text(),' ')),'\\W+') = ('red','black')]]"
val namespaces = new HashMap[String,String](Map(
"xocs" -> "http://www.elsevier.com/xml/xocs/dtd",
"ja" -> "http://www.elsevier.com/xml/ja/dtd",
"ce" ->"http://www.elsevier.com/xml/common/dtd"
).asJava)
val proc = XPathProcessor.getInstance(xpath,namespaces)
recsIter.filter(rec => proc.filterString(rec._2))
})
println("Unfiltered Count %s".format(xmlKeyPair.count))
println("Filtered Count %s".format(filtered.count))
Note that a document can contain many ‘sections’. XSLT (XPath and XQuery) let us go beyond normal full-text search capabilities by doing things such as limiting the results to those where the occurrence of ‘red’ or ‘black’ is in the first section.
import com.elsevier.spark_xml_utils.xpath.XPathProcessor
import scala.collection.JavaConverters._
import java.util.HashMap
val xmlKeyPair = sc.sequenceFile[String, String]("s3n://spark-xml-utils/xml/part*")
val filtered = xmlKeyPair.mapPartitions(recsIter => {
val xpath ="/xocs:doc[./xocs:meta[xocs:content-type='JL' " +
"and xocs:item-stage='S300' " +
"and xocs:cover-date-year > 2010 " +
"and xocs:cover-date-year < 2014] " +
"and .//ja:head[.//ce:abstract[tokenize(lower-case(string-join(.//text(),' ')),'\\W+') = ('heart','brain','body','number')]] " +
"and .//ce:section[position()=1 and tokenize(lower-case(string-join(.//text(),' ')),'\\W+') = ('red','black')]]"
val namespaces = new HashMap[String,String](Map(
"xocs" -> "http://www.elsevier.com/xml/xocs/dtd",
"ja" -> "http://www.elsevier.com/xml/ja/dtd",
"ce" ->"http://www.elsevier.com/xml/common/dtd"
).asJava)
val proc = XPathProcessor.getInstance(xpath,namespaces)
recsIter.filter(rec => proc.filterString(rec._2))
})
println("Unfiltered Count %s".format(xmlKeyPair.count))
println("Filtered Count %s".format(filtered.count))
One current shortcoming is the result of an XPath or XQuery evaluation is a string and not a sequence of nodes. For example, consider the scenario where you would like to return a List of authors for each record where each author is a separate entry in the List sorted by author surname. Within each entry, we would like the author given name and surname separated.
Below is an approach that we have used. In this scenario, we add delimiters between records and then use split to separate the records. While not ideal, this does help us handle these types of situations. It should also be noted that we could have used an XQuery expression (with an order by clause) to further simplify the code.
import com.elsevier.spark_xml_utils.xquery.XQueryProcessor
import scala.collection.JavaConverters._
import java.util.HashMap
val xmlKeyPair = sc.sequenceFile[String, String]("s3n://spark-xml-utils/xml/part*")
val results = xmlKeyPair.mapPartitions(recsIter => {
val xpathAuthors = "string-join((for $x in //*:head/ce:author-group/ce:author return concat($x/ce:given-name,'^',$x/ce:surname)), '|')"
val namespaces = new HashMap[String,String](Map(
"ce" ->"http://www.elsevier.com/xml/common/dtd"
).asJava)
val proc = XPathProcessor.getInstance(xpathAuthors,namespaces)
recsIter.map(rec => proc.evaluateString(rec._2).split('|')
.map(rec => {
val toks = rec.split('^')
(toks(0),toks(1))})
.toList
.sortWith((a, b) => (a._2 < b._2))
)})
While many examples show only a single XPath or XQuery expression, it is possible (and common) to apply multiple expressions. Simply provide each expression and get a processor for each expression.
import com.elsevier.spark_xml_utils.xpath.XPathProcessor
import scala.collection.JavaConverters._
import java.util.HashMap
val xmlKeyPair = sc.sequenceFile[String, String]("s3n://spark-xml-utils/xml/part*")
val results = xmlKeyPair.mapPartitions(partition => {
val xpathDocid = "string(/xoe:enhanced-document/xocs:doc[xocs:item/item/ait:process-info/ait:status[@type='core']]/xocs:meta/cto:group-id)"
val xpathSrcId = "string(/xoe:enhanced-document/xocs:doc/xocs:item/item[ait:process-info/ait:status[@type='core']]/bibrecord/head/source/@srcid)"
val xpathPubYr = "string(/xoe:enhanced-document/xocs:doc[xocs:item/item/ait:process-info/ait:status[@type='core']]/xocs:meta/xocs:pub-year)"
val xpathSrcType = "string(/xoe:enhanced-document/xocs:doc[xocs:item/item/ait:process-info/ait:status[@type='core']]/xocs:meta/xocs:srctype)"
val xpathDocType = "string(/xoe:enhanced-document/xocs:doc[xocs:item/item/ait:process-info/ait:status[@type='core']]/xocs:meta/cto:doctype)"
val namespaces = new HashMap[String,String](Map(
"xoe" -> "http://www.elsevier.com/xml/xoe/dtd",
"xocs" -> "http://www.elsevier.com/xml/xocs/dtd",
"ce" -> "http://www.elsevier.com/xml/ani/common",
"cto" -> "http://www.elsevier.com/xml/cto/dtd",
"ait" -> "http://www.elsevier.com/xml/ani/ait").asJava)
val procDocId = XPathProcessor.getInstance(xpathDocid,namespaces)
val procSrcId = XPathProcessor.getInstance(xpathSrcId,namespaces)
val procPubYr = XPathProcessor.getInstance(xpathPubYr,namespaces)
val procSrcType = XPathProcessor.getInstance(xpathSrcType,namespaces)
val procDocType = XPathProcessor.getInstance(xpathDocType,namespaces)
// per xml record
partition.map(rec => {
val docId = procDocId.evaluateString(rec._2)
val srcId = procSrcId.evaluateString(rec._2)
val pubYr = procPubYr.evaluateString(rec._2)
val srcType = procSrcType.evaluateString(rec._2)
val docType = procDocType.evaluateString(rec._2)
(docId, srcId, pubYr, srcType, docType)
})
We are just scratching the surface for what we would like to provide with spark-xml-utils and what is possible. Within Labs, we have been using it for over 6 months and have had great success. If you have any questions or ideas for other complex examples, please let us know and we will add them.