From f87201ef681eb3e94cb570c5c83a22d2e8b269d2 Mon Sep 17 00:00:00 2001 From: yvesf Date: Sat, 20 Nov 2010 20:24:19 +0100 Subject: web broken; indexer process Pool --- indexer.py | 141 ++++++++++++++++++++++++++++--------------------------------- 1 file changed, 64 insertions(+), 77 deletions(-) (limited to 'indexer.py') diff --git a/indexer.py b/indexer.py index ac14a9e..af47ead 100644 --- a/indexer.py +++ b/indexer.py @@ -8,112 +8,99 @@ import whoosh.fields as fields import time from cStringIO import StringIO from Queue import Queue, Empty -from threading import Thread, Condition +import multiprocessing as mp -schema = fields.Schema( - title=fields.TEXT(stored=True), - path=fields.ID(stored=True), +schema_book = fields.Schema( pagenumber=fields.NUMERIC(stored=True), + metadata_docnum=fields.NUMERIC(stored=True), content=fields.TEXT(stored=True), - createtime=fields.NUMERIC() ) + ) -if not os.path.exists("index"): - create = True - os.mkdir("index") - index = create_in(u"index", schema) +schema_metadata = fields.Schema( + title = fields.TEXT(stored=True), + path=fields.ID(stored=True,unique=True), + createtime=fields.NUMERIC(stored=True) ) + +if not os.path.exists(u"index"): + create_index = True + os.mkdir(u"index") + index_book = create_in(u"index", schema_book, u"book") + index_metadata = create_in(u"index", schema_metadata, u"metadata") else: - create = False - index = open_dir("index") + create_index = False + index_book = open_dir(u"index", u"book") + index_metadata = open_dir(u"index", u"metadata") -filepaths = Queue() -documents = Queue(maxsize=5) #PDFWorker should be maximal 5 documents ahead -notifier = Condition() +filepaths = [] directory = unicode(sys.argv[1], "utf8") -searcher = index.searcher() +searcher_book = index_book.searcher() +searcher_metadata = index_metadata.searcher() print u"Walking {0}".format(directory) filecount = 0 skipped = 0 for path, directories, files in os.walk(directory): for filename in files: - if filename.endswith(".pdf"): + if filename.endswith(u".pdf"): filepath = os.path.join(path, filename) - docnum = create or searcher.document_number(path=filepath) + docnum = create_index or searcher_metadata.document_number(path=filepath) if not docnum: skipped += 1 else: - filepaths.put(filepath) + filepaths.append(filepath) filecount += 1 print u"\r{0} files found {1} skipped".format(filecount+skipped, skipped), print "" -if not create: - writer = index.writer() +if not create_index: #update index for deleted files + writer_book = index_book.writer() + writer_metadata = index_metadata.writer() deleted = 0 processed = 0 - for fields in searcher.all_stored_fields(): + for fields in searcher_metadata.all_stored_fields(): path = fields['path'] processed += 1 if not os.path.exists(path): - writer.delete_by_term('path', path) + writer_book.delete_by_term(u'metadata_docnum', searcher_metadata.document_number(path=path)) + writer_metadata.delete_by_term('path', path) deleted += 1 print u"\r{0} pages processed. {1} deleted".format(processed, deleted), print "" - writer.commit() - -searcher.close() - -class PDFWorker(Thread): - def run(self): - while True: - try: - filepath = filepaths.get(False) - except Empty: - break - try: - print u"{0} processing {1}".format(self.name, filepath) - inputfile = pyPdf.PdfFileReader(file(filepath, 'r')) - title = inputfile.getDocumentInfo().title - pagenumber = 0 - for page in inputfile.pages: - print u"{0} processing {1} Page {2}".format(self.name, filepath, pagenumber) - pagenumber += 1 - content = page.extractText() - documents.put( {"title":title, "path":filepath, "pagenumber":pagenumber, "content":content, "createtime":time.time() } ) - except Exception, e: - print u"{0} Exception: {1}".format(self.name, str(e)) - finally: - print u"{0} finished {1}".format(self.name, filepath) - filepaths.task_done() - -class IndexWorker(Thread): - def run(self): - while index != None: - try: - doc = documents.get(True, 0.5) - except Empty,e: - continue - print u"{0} adding {1} page {2}".format(self.name, doc['path'], doc['pagenumber']) - writer = index.writer() - writer.add_document(**doc) - writer.commit() - documents.task_done() - print u"{0} added {1} page {2}".format(self.name, doc['path'], doc['pagenumber']) - -threads = map(lambda i: PDFWorker(), range(1)) -for thread in threads: - thread.start() + writer_book.commit() + writer_metadata.commit() -idx = IndexWorker() -idx.start() -print "all running" +searcher_book.close() +searcher_metadata.close() -for thread in threads: - thread.join() +def process_file(filepath): + print u"{0} processing {1}".format(os.getpid(), filepath) + inputfile = pyPdf.PdfFileReader(file(filepath, 'r')) + title = inputfile.getDocumentInfo().title + writer_metadata = index_metadata.writer() + writer_metadata.add_document(title=title, path=filepath, createtime=time.time()) + writer_metadata.commit() + searcher_metadata = index_metadata.searcher() + metadata_docnum = searcher_metadata.document_number(path=filepath) + searcher_metadata.close() -oldindex = index -index = None -idx.join() -print "optimize index" -oldindex.optimize() -oldindex.close() + pagenumber = 1 + for page in inputfile.pages: + print u"processing {0} Page {1}".format(filepath, pagenumber) + content = page.extractText() + writer_book = index_book.writer() + writer_book.add_document(pagenumber=pagenumber, + metadata_docnum=metadata_docnum, + content=content) + writer_book.commit() + pagenumber += 1 +try: + pool = mp.Pool() + pool.apply(process_file, filepaths) +except ImportError: + for filepath in filepaths: + process_file(filepath) +print u"optimize indexes" +index_metadata.optimize() +index_metadata.close() +index_book.optimze() +index_book.close() -- cgit v1.2.1