Part 3: Add Spark to a Big Data Application with Text Search Capability

In this third and final part of this blog post, I provide more technical details on the changes we made to Parquet and Spark. This post will be of interest to a software developer. If a reader is familiar with Parquet and Spark with sufficient technical details, the reader may appreciate this content more, though I have tried to describe the details to be understood by others without that knowledge. I also encourage the reader to read the first two parts of this blog post to get the context of these changes that I describe.

Changes to Parquet Codebase

We chose to write the embedded text search index within the Parquet file in the Parquet format itself. We settled on this for expediency, and also to see if such an idea is feasible. We were aware of Parquet format’s very efficient encoding mechanism, and it seemed like a reasonable choice for experimentation to represent text index information that Lucene maintains. Before proceeding on this path, we did a spot check for a sample text file with log messages, comparing the overall file size generated by Lucene to that of in our intended Parquet format. We share this data in the Performance Experience section below. This choice meant that we could concentrate on modifying only Parquet software to get better performance. When building the index, for analyzing text of different languages, the application uses Lucene’s Analyzer component and maintains the text index  using Parquet’s encoding techniques, before writing out the index using the modified Parquet library. The information in the index is quite similar to that of Lucene.

A Parquet file reader first reads the file’s footer to locate the file offsets at which each column starts. Note that data in a Parquet file is stored one column after another. The format specification makes no assumption that the footer follows immediately after the last column; the reader completely goes by the offsets it reads from the footer that is at the end of the file. We use this property to create a hole, for the index, between the last column and the footer. In that hole, we write the text search index in Parquet format followed by the footer of the data columns. The Parquet footer of the search index precedes that of the outer Parquet file. See the picture below, for a visualization of this. In order to read the index as a Parquet file, we need the offset at which the index footer starts and its size, since the technique of seeking to the end of file to read the footer no longer works for the index. These required bits of information are stored in outer footer’s extensible metadata section as name-value pairs, an extension facility provided by Parquet format specification to store arbitrary name-value pairs by an application.

 

big data application

 

We made enhancement to Parquet’s writing API so that the index stream is considered as an auxiliary of the data stream. Rows are added to data stream and index stream independently, but when data stream is closed, the auxiliary index stream is also automatically closed resulting in the above file format.

We also changed how rows can be incrementally added to a stream writing to a Parquet file. Before the change, the Parquet library code assumed that its internal encoded memory state when flushed to a file can be discarded without any ill effect. However, that assumption is no longer valid when rows are appended incrementally and the encoded content is periodically flushed to files. So we introduced an option to flush to a file but not discard the internal state, anticipating additional rows in future. This avoided re-encoding of earlier rows that have been flushed to file before, saving a significant computing cost for encoding into Parquet format. And also the writing application did not have to buffer earlier rows till the file size reaches the HDFS block size limit. From space efficiency point of view, buffering rows in Parquet’s encoded format should be better than having to buffer them in raw text format, though we did not measure this cost saving.

Changes to Spark Codebase

We chose to extend Spark SQL using built-in search filter functions such as containscontains_exact, starts_with, and so on. We modeled search filter push down on existing filter push down code for Parquet file reader. The new code mimics that code path. One wrinkle is that we need to access the footer of the Parquet file to find out whether the file has an embedded text search index in order to see if search filter push down is possible. The queries work even when there is  no embedded text search index available, but with degraded performance.

Performance Experience

The performance numbers reported here should be treated as indicative rather than definitive as in a benchmark. The numbers  are based on a sample log file of a sshd daemon, which is synthetically generated. The numbers implicitly reflect the idiosyncrasy inherent in the generated input. A more rigorous analysis of different types of input or on larger sizes of data has not been performed.

 

big data application

 

We should be careful not to draw the conclusion that Parquet format that we have chosen is more efficient than the index format of Lucene. This is just a single data point. It is possible that Lucene is storing some auxiliary index information that helps faster query processing or some index compacting operation has not kicked in because of smaller data size. It should simply be interpreted as an encouraging data point for this format.

Earlier in this post, I mentioned how we changed Parquet API to add rows incrementally to a stream writing to a Parquet file, which is flushed periodically. Before this modification,  the writing of 1 million synthetic log lines of sshd took about roughly 9.6 seconds on my laptop, which mostly consisted of an avoidable computing cost of re-encoding earlier documents in parquet encoding format. After the modification, we were writing the encoded buffer every 2 seconds, after incrementally adding 18 to 22 thousand log lines in each interval. Even when the buffered log line count approached 1 million, the actual writing operation did not take more than 1.3 seconds, well within the 2 second periodic interval. There is a downside to this approach: the generated text index is not as compact which degrades search query performance. The size of the new parquet file compared to the most compact possible, without any periodic flushing, increased by 46 percent, for this sample dataset. This can be remedied by doing a index compacting operation, which will incur one time additional cpu and I/O cost, when no longer new rows are incrementally added to a file that reached the limit of a HDFS block size.

Getting interactive sub-second text search query response time using Spark proved much harder, even for a small dataset of 1 million synthetic log lines of sshd. Using Lucene library, such a text search database can be queried in less than 100 millisecond, though in this measurement no network communication and serialization/deserialization cost is included. Initially, we implemented text search query as a SQL join operation between the index and data tables, just to get a sense of the cost. The end-to-end query execution time was several seconds. We got the reinforcement that without a search query filter push down operation to Parquet library we will not achieve our goal. Our internal measurement also showed that the number of rows in the index that need to be scanned and examined should be as few as possible. With text search filter push down implemented and re-organizing the index structure within the Parquet format, we were able to execute the same query under 400 ms, which took multi-seconds without the changes. This number measures elapsed time of the query execution that includes communication and serialization cost of result in a Spark program. As any one working on performance knows, there is always room for improving performance; this situation is no different.

Conclusion

Our preliminary experience shows that text search as an extension of Spark is a viable and desirable option. Further performance improvement and lower latency for making ingested rows available to downstream applications is also possible. For lower latency, exploration of writing incrementally to Tachyon, a memory storage system integrated with Spark, is a promising option. For better search index lookup, implementation of skipping a columnar index data page based on minimum and maximum values in a page header should improve query execution time. Also researching other index structures, including Lucene’s internal representation of its index will provide clues to further improvement in performance.

Acknowledgement

Alosh Bennett (formerly with Informatica) and Jaspreet Singh also worked on this project making Spark and Parquet suitable for text search. They contributed ideas, researched implementation options, and more importantly, contributed code to Spark and Parquet for the project. I thank them for their contributions.

You can read:

Part 1: Add Spark to a Big Data Application with Text Search Capability

Part 2: Add Spark to a Big Data Application with Text Search Capability

Comments