Job Queue

Django, Flask, Bottle, WSGI, CGI…
Antworten
Benutzeravatar
Felix92
User
Beiträge: 133
Registriert: Mittwoch 7. November 2018, 17:57

Huhu, ich habe gerade folgende Probleme mit meiner API:
Derzeit läuft die Zuweisung der Jobs über eine andere API dieser Prozess soll nun in meiner API integriert werden, damit diese komplett unabhängig ist.
Dabei wird folgendes benötigt:
- ablegen/abspeichern aller requests bis verarbeitet wurden (~100) -> Datenbank (Redis) benötigt oder reicht queue ?
- eigenständige Zuweisung der Jobs
- wenn möglich Priorisierung bei der Abarbeitung -> Endpunkt abhängig -> queue.PriorityQueue() ?
- bestmöglich parallele Abarbeitung (Begrenzung 5 jobs)

bisherige Probleme:
Endpunkte ner/ner_complete funktionieren weder mit ProcessPoolExcecuter noch mit RabbitMQ/Celery, da das Keras Model nicht pickable ist.

Fragen:
Was wäre der effizienteste Weg alle requests abzuspeichern bis sie verarbeitet wurden ?
Welche Möglichkeiten neben rabbitMQ/Celery gibt es zur Umsetzung von Job Queues so das diese im Hintergrund verarbeitet und bei ?

Vielen Dank für eure Hilfe :)

invoice:

Code: Alles auswählen

from pydantic import BaseModel


class Invoice(BaseModel):
    """API input
    """
    content: str


class OcrResult(BaseModel):
    """API output
    """
    content: str


class NerResult(BaseModel):
    """API output
    """
    content: list

service:

Code: Alles auswählen

from datetime import datetime, timedelta
from timeit import default_timer as timer

from fastapi import BackgroundTasks, Depends, FastAPI, Header, HTTPException, APIRouter, status as stat
from fastapi.security.utils import get_authorization_scheme_param
from fastapi.middleware.cors import CORSMiddleware
from pydantic import AnyHttpUrl, BaseModel, Field
from typing import Optional, Dict
import queue

import requests

import asyncio
from concurrent.futures.process import ProcessPoolExecutor
from uuid import UUID, uuid4

import ktrain

from content.ocr_mode_handler import handle_ocr
from content.invoice import Invoice, OcrResult
from content.ner import get_entities


class Job(BaseModel):
    """API job
    """
    uid: UUID = Field(default_factory=uuid4)
    status: str = "in_progress"
    result: int = None


app = FastAPI()
#app.queue_system = queue.PriorityQueue()
#app.queue_limit = 100
router = APIRouter()
jobs: Dict[UUID, Job] = {}


async def run_in_process(fn, *args):
    """runs job as process
    """
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(app.state.executor, fn, *args)  # wait and return result


async def start_cpu_bound_task(uid: UUID, invoice, mode, token, callback_url, res_width, check_before_ocr) -> None:
    """starts one job per request
    """
    jobs[uid].result = await run_in_process(__process_request, invoice, mode, token, callback_url, res_width, check_before_ocr)
    jobs[uid].status = "complete"


app.add_middleware(
        CORSMiddleware,
        allow_origins=["*"],
        allow_credentials=True,
        allow_methods=["*"],
        allow_headers=["*"],
    )
app.include_router(router)

global predictor
predictor = ktrain.load_predictor('model')

if predictor is None:
    print('reload model')
    predictor = ktrain.load_predictor('model')


@app.get("/")
async def root():
    """Entrypoint
    """
    return {"message": "OCR Engine and NER service"}

@app.get("/status")
async def status():
    """status time
    """
    version = "v1.7"
    now = datetime.now()
    dt_string = now.strftime(r"%d/%m/%Y %H:%M:%S")
    return {"date and time": dt_string, "version": version}

@app.get("/labels")
async def get_labels():
    """returns a list of the current labels
    """
    labels = predictor.get_classes()
    return {"labels": labels}


@app.on_event("startup")
async def startup_event():
    """startup for one job
    """
    app.state.executor = ProcessPoolExecutor()


@app.on_event("shutdown")
async def on_shutdown():
    """shutdown for one job
    """
    app.state.executor.shutdown()


@app.get("/jobcount")
async def job_counter():
    incomplete = uncompleted_jobs()
    return sum(x for x in incomplete)


def uncompleted_jobs():
    for jobEntryKey in jobs.keys():
        if jobs[jobEntryKey].status != "complete":
            yield 1


@app.get("/jobstatus/{uid}")
async def status_handler(uid: UUID):
    """job id
    """
    return jobs[uid]


def get_token_from_header(*, authorization: str = Header(None)):
    """token from request header

    Parameters
    ----------
    authorization : str, optional
        auth scheme, by default Header(None)

    Returns
    -------
    str
        token from header

    Raises
    ------
    credentials_exception
        if token is not valid
    """
    credentials_exception = HTTPException(
        status_code=stat.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    scheme, token = get_authorization_scheme_param(authorization)
    if scheme.lower() != "bearer":
        raise credentials_exception
    return token


@app.post("/ner")
async def ner(invoice: Invoice, max_sentence_number: int = 0, return_merged: bool = True, crop_image: int = 50, ocr: bool = True):
    # TypeError: cannot pickle '_thread.RLock' object  -> Issue: multiprocessing cant pickle h5
    #result = await run_in_process(get_entities, invoice, max_sentence_number, return_merged, crop_image,
    #                              ocr, predictor)

    start = timer()
    result = get_entities(invoice, max_sentence_number, return_merged, crop_image, ocr, predictor=predictor)
    time = timedelta(seconds=timer() - start) / timedelta(milliseconds=1)
    return {'content': result.content, 'execTime': time}


@app.post("/ner_complete")
async def ner_complete(invoice: Invoice, max_sentence_number: int = 0, return_merged: bool = True, crop_image: int = 0, ocr: bool = False):
    start = timer()
    result = get_entities(invoice, max_sentence_number, return_merged, crop_image, ocr, predictor=predictor)
    time = timedelta(seconds=timer() - start) / timedelta(milliseconds=1)
    return {'content': result.content, 'execTime': time}


@app.post("/txt", callbacks=router.routes)
async def ocr_txt(invoice: Invoice, background_tasks: BackgroundTasks, token = Depends(get_token_from_header), callback_url: Optional[AnyHttpUrl] = None, res_width: int=3500, check_before_ocr: bool=True):
    """txt endpoint
    """
    new_task = Job()
    jobs[new_task.uid] = new_task
    background_tasks.add_task(start_cpu_bound_task, new_task.uid, invoice, 'txt', token, callback_url, res_width, check_before_ocr)
    return {"message": "OCR Processing sent in the background"} 

 
@app.post("/pdfa", callbacks=router.routes)
async def ocr_pdfa(invoice: Invoice, background_tasks: BackgroundTasks, token = Depends(get_token_from_header), callback_url: Optional[AnyHttpUrl] = None, res_width: int=3500, check_before_ocr: bool=True):
    """pdfa endpoint
    """
    new_task = Job()
    jobs[new_task.uid] = new_task
    background_tasks.add_task(start_cpu_bound_task, new_task.uid, invoice, 'pdfa', token, callback_url, res_width, check_before_ocr)
    return {"message": "OCR Processing sent in the background"} 

def __process_request(invoice, mode, token, callback_url, res_width, check_before_ocr):
    """one job execution and returns result to URL

    Parameters
    ----------
    invoice : class
        API input
    mode : str
        return as txt or pdfa
    token : str
        validation token
    callback_url : str
        url for result callback
    res_width : int
        width for image resizing
    check_before_ocr : bool
        checks if image is usable for OCR 
    """
    headers = { 'Authorization': 'Bearer ' + token }
    try:
        start = timer()
        result = handle_ocr(invoice, mode, res_width, check_before_ocr)    
        time = timedelta(seconds=timer()-start) / timedelta(milliseconds=1)   
        requests.post(callback_url, headers=headers, json={'content': result.content, 'execTime': time})
    except HTTPException as e:
        print("exception catched", e.status_code, e.detail)
        requests.post(callback_url, headers=headers, json={"errors": [{"statusCode": 500, "message": e.detail}]})
Antworten