summaryrefslogtreecommitdiff
path: root/indexer.py
diff options
context:
space:
mode:
authoryvesf <yvesf-git@xapek.org>2010-11-20 20:24:19 +0100
committeryvesf <yvesf-git@xapek.org>2010-11-20 20:24:19 +0100
commitf87201ef681eb3e94cb570c5c83a22d2e8b269d2 (patch)
tree8c98487b84b9656bbf6d4160e619cfbfc168ce6f /indexer.py
parentf78911646cf53e37c47921f9dcd9702d6e946f54 (diff)
downloadbooksearch-f87201ef681eb3e94cb570c5c83a22d2e8b269d2.tar.gz
booksearch-f87201ef681eb3e94cb570c5c83a22d2e8b269d2.zip
web broken; indexer process Pool
Diffstat (limited to 'indexer.py')
-rw-r--r--indexer.py141
1 files changed, 64 insertions, 77 deletions
diff --git a/indexer.py b/indexer.py
index ac14a9e..af47ead 100644
--- a/indexer.py
+++ b/indexer.py
@@ -8,112 +8,99 @@ import whoosh.fields as fields
import time
from cStringIO import StringIO
from Queue import Queue, Empty
-from threading import Thread, Condition
+import multiprocessing as mp
-schema = fields.Schema(
- title=fields.TEXT(stored=True),
- path=fields.ID(stored=True),
+schema_book = fields.Schema(
pagenumber=fields.NUMERIC(stored=True),
+ metadata_docnum=fields.NUMERIC(stored=True),
content=fields.TEXT(stored=True),
- createtime=fields.NUMERIC() )
+ )
-if not os.path.exists("index"):
- create = True
- os.mkdir("index")
- index = create_in(u"index", schema)
+schema_metadata = fields.Schema(
+ title = fields.TEXT(stored=True),
+ path=fields.ID(stored=True,unique=True),
+ createtime=fields.NUMERIC(stored=True) )
+
+if not os.path.exists(u"index"):
+ create_index = True
+ os.mkdir(u"index")
+ index_book = create_in(u"index", schema_book, u"book")
+ index_metadata = create_in(u"index", schema_metadata, u"metadata")
else:
- create = False
- index = open_dir("index")
+ create_index = False
+ index_book = open_dir(u"index", u"book")
+ index_metadata = open_dir(u"index", u"metadata")
-filepaths = Queue()
-documents = Queue(maxsize=5) #PDFWorker should be maximal 5 documents ahead
-notifier = Condition()
+filepaths = []
directory = unicode(sys.argv[1], "utf8")
-searcher = index.searcher()
+searcher_book = index_book.searcher()
+searcher_metadata = index_metadata.searcher()
print u"Walking {0}".format(directory)
filecount = 0
skipped = 0
for path, directories, files in os.walk(directory):
for filename in files:
- if filename.endswith(".pdf"):
+ if filename.endswith(u".pdf"):
filepath = os.path.join(path, filename)
- docnum = create or searcher.document_number(path=filepath)
+ docnum = create_index or searcher_metadata.document_number(path=filepath)
if not docnum:
skipped += 1
else:
- filepaths.put(filepath)
+ filepaths.append(filepath)
filecount += 1
print u"\r{0} files found {1} skipped".format(filecount+skipped, skipped),
print ""
-if not create:
- writer = index.writer()
+if not create_index: #update index for deleted files
+ writer_book = index_book.writer()
+ writer_metadata = index_metadata.writer()
deleted = 0
processed = 0
- for fields in searcher.all_stored_fields():
+ for fields in searcher_metadata.all_stored_fields():
path = fields['path']
processed += 1
if not os.path.exists(path):
- writer.delete_by_term('path', path)
+ writer_book.delete_by_term(u'metadata_docnum', searcher_metadata.document_number(path=path))
+ writer_metadata.delete_by_term('path', path)
deleted += 1
print u"\r{0} pages processed. {1} deleted".format(processed, deleted),
print ""
- writer.commit()
-
-searcher.close()
-
-class PDFWorker(Thread):
- def run(self):
- while True:
- try:
- filepath = filepaths.get(False)
- except Empty:
- break
- try:
- print u"{0} processing {1}".format(self.name, filepath)
- inputfile = pyPdf.PdfFileReader(file(filepath, 'r'))
- title = inputfile.getDocumentInfo().title
- pagenumber = 0
- for page in inputfile.pages:
- print u"{0} processing {1} Page {2}".format(self.name, filepath, pagenumber)
- pagenumber += 1
- content = page.extractText()
- documents.put( {"title":title, "path":filepath, "pagenumber":pagenumber, "content":content, "createtime":time.time() } )
- except Exception, e:
- print u"{0} Exception: {1}".format(self.name, str(e))
- finally:
- print u"{0} finished {1}".format(self.name, filepath)
- filepaths.task_done()
-
-class IndexWorker(Thread):
- def run(self):
- while index != None:
- try:
- doc = documents.get(True, 0.5)
- except Empty,e:
- continue
- print u"{0} adding {1} page {2}".format(self.name, doc['path'], doc['pagenumber'])
- writer = index.writer()
- writer.add_document(**doc)
- writer.commit()
- documents.task_done()
- print u"{0} added {1} page {2}".format(self.name, doc['path'], doc['pagenumber'])
-
-threads = map(lambda i: PDFWorker(), range(1))
-for thread in threads:
- thread.start()
+ writer_book.commit()
+ writer_metadata.commit()
-idx = IndexWorker()
-idx.start()
-print "all running"
+searcher_book.close()
+searcher_metadata.close()
-for thread in threads:
- thread.join()
+def process_file(filepath):
+ print u"{0} processing {1}".format(os.getpid(), filepath)
+ inputfile = pyPdf.PdfFileReader(file(filepath, 'r'))
+ title = inputfile.getDocumentInfo().title
+ writer_metadata = index_metadata.writer()
+ writer_metadata.add_document(title=title, path=filepath, createtime=time.time())
+ writer_metadata.commit()
+ searcher_metadata = index_metadata.searcher()
+ metadata_docnum = searcher_metadata.document_number(path=filepath)
+ searcher_metadata.close()
-oldindex = index
-index = None
-idx.join()
-print "optimize index"
-oldindex.optimize()
-oldindex.close()
+ pagenumber = 1
+ for page in inputfile.pages:
+ print u"processing {0} Page {1}".format(filepath, pagenumber)
+ content = page.extractText()
+ writer_book = index_book.writer()
+ writer_book.add_document(pagenumber=pagenumber,
+ metadata_docnum=metadata_docnum,
+ content=content)
+ writer_book.commit()
+ pagenumber += 1
+try:
+ pool = mp.Pool()
+ pool.apply(process_file, filepaths)
+except ImportError:
+ for filepath in filepaths:
+ process_file(filepath)
+print u"optimize indexes"
+index_metadata.optimize()
+index_metadata.close()
+index_book.optimze()
+index_book.close()