Derzeit läuft die Zuweisung der Jobs über eine andere API dieser Prozess soll nun in meiner API integriert werden, damit diese komplett unabhängig ist.
Dabei wird folgendes benötigt:
- ablegen/abspeichern aller requests bis verarbeitet wurden (~100) -> Datenbank (Redis) benötigt oder reicht queue ?
- eigenständige Zuweisung der Jobs
- wenn möglich Priorisierung bei der Abarbeitung -> Endpunkt abhängig -> queue.PriorityQueue() ?
- bestmöglich parallele Abarbeitung (Begrenzung 5 jobs)
bisherige Probleme:
Endpunkte ner/ner_complete funktionieren weder mit ProcessPoolExcecuter noch mit RabbitMQ/Celery, da das Keras Model nicht pickable ist.
Fragen:
Was wäre der effizienteste Weg alle requests abzuspeichern bis sie verarbeitet wurden ?
Welche Möglichkeiten neben rabbitMQ/Celery gibt es zur Umsetzung von Job Queues so das diese im Hintergrund verarbeitet und bei ?
Vielen Dank für eure Hilfe
invoice:
Code: Alles auswählen
from pydantic import BaseModel
class Invoice(BaseModel):
"""API input
"""
content: str
class OcrResult(BaseModel):
"""API output
"""
content: str
class NerResult(BaseModel):
"""API output
"""
content: list
service:
Code: Alles auswählen
from datetime import datetime, timedelta
from timeit import default_timer as timer
from fastapi import BackgroundTasks, Depends, FastAPI, Header, HTTPException, APIRouter, status as stat
from fastapi.security.utils import get_authorization_scheme_param
from fastapi.middleware.cors import CORSMiddleware
from pydantic import AnyHttpUrl, BaseModel, Field
from typing import Optional, Dict
import queue
import requests
import asyncio
from concurrent.futures.process import ProcessPoolExecutor
from uuid import UUID, uuid4
import ktrain
from content.ocr_mode_handler import handle_ocr
from content.invoice import Invoice, OcrResult
from content.ner import get_entities
class Job(BaseModel):
"""API job
"""
uid: UUID = Field(default_factory=uuid4)
status: str = "in_progress"
result: int = None
app = FastAPI()
#app.queue_system = queue.PriorityQueue()
#app.queue_limit = 100
router = APIRouter()
jobs: Dict[UUID, Job] = {}
async def run_in_process(fn, *args):
"""runs job as process
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(app.state.executor, fn, *args) # wait and return result
async def start_cpu_bound_task(uid: UUID, invoice, mode, token, callback_url, res_width, check_before_ocr) -> None:
"""starts one job per request
"""
jobs[uid].result = await run_in_process(__process_request, invoice, mode, token, callback_url, res_width, check_before_ocr)
jobs[uid].status = "complete"
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(router)
global predictor
predictor = ktrain.load_predictor('model')
if predictor is None:
print('reload model')
predictor = ktrain.load_predictor('model')
@app.get("/")
async def root():
"""Entrypoint
"""
return {"message": "OCR Engine and NER service"}
@app.get("/status")
async def status():
"""status time
"""
version = "v1.7"
now = datetime.now()
dt_string = now.strftime(r"%d/%m/%Y %H:%M:%S")
return {"date and time": dt_string, "version": version}
@app.get("/labels")
async def get_labels():
"""returns a list of the current labels
"""
labels = predictor.get_classes()
return {"labels": labels}
@app.on_event("startup")
async def startup_event():
"""startup for one job
"""
app.state.executor = ProcessPoolExecutor()
@app.on_event("shutdown")
async def on_shutdown():
"""shutdown for one job
"""
app.state.executor.shutdown()
@app.get("/jobcount")
async def job_counter():
incomplete = uncompleted_jobs()
return sum(x for x in incomplete)
def uncompleted_jobs():
for jobEntryKey in jobs.keys():
if jobs[jobEntryKey].status != "complete":
yield 1
@app.get("/jobstatus/{uid}")
async def status_handler(uid: UUID):
"""job id
"""
return jobs[uid]
def get_token_from_header(*, authorization: str = Header(None)):
"""token from request header
Parameters
----------
authorization : str, optional
auth scheme, by default Header(None)
Returns
-------
str
token from header
Raises
------
credentials_exception
if token is not valid
"""
credentials_exception = HTTPException(
status_code=stat.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
scheme, token = get_authorization_scheme_param(authorization)
if scheme.lower() != "bearer":
raise credentials_exception
return token
@app.post("/ner")
async def ner(invoice: Invoice, max_sentence_number: int = 0, return_merged: bool = True, crop_image: int = 50, ocr: bool = True):
# TypeError: cannot pickle '_thread.RLock' object -> Issue: multiprocessing cant pickle h5
#result = await run_in_process(get_entities, invoice, max_sentence_number, return_merged, crop_image,
# ocr, predictor)
start = timer()
result = get_entities(invoice, max_sentence_number, return_merged, crop_image, ocr, predictor=predictor)
time = timedelta(seconds=timer() - start) / timedelta(milliseconds=1)
return {'content': result.content, 'execTime': time}
@app.post("/ner_complete")
async def ner_complete(invoice: Invoice, max_sentence_number: int = 0, return_merged: bool = True, crop_image: int = 0, ocr: bool = False):
start = timer()
result = get_entities(invoice, max_sentence_number, return_merged, crop_image, ocr, predictor=predictor)
time = timedelta(seconds=timer() - start) / timedelta(milliseconds=1)
return {'content': result.content, 'execTime': time}
@app.post("/txt", callbacks=router.routes)
async def ocr_txt(invoice: Invoice, background_tasks: BackgroundTasks, token = Depends(get_token_from_header), callback_url: Optional[AnyHttpUrl] = None, res_width: int=3500, check_before_ocr: bool=True):
"""txt endpoint
"""
new_task = Job()
jobs[new_task.uid] = new_task
background_tasks.add_task(start_cpu_bound_task, new_task.uid, invoice, 'txt', token, callback_url, res_width, check_before_ocr)
return {"message": "OCR Processing sent in the background"}
@app.post("/pdfa", callbacks=router.routes)
async def ocr_pdfa(invoice: Invoice, background_tasks: BackgroundTasks, token = Depends(get_token_from_header), callback_url: Optional[AnyHttpUrl] = None, res_width: int=3500, check_before_ocr: bool=True):
"""pdfa endpoint
"""
new_task = Job()
jobs[new_task.uid] = new_task
background_tasks.add_task(start_cpu_bound_task, new_task.uid, invoice, 'pdfa', token, callback_url, res_width, check_before_ocr)
return {"message": "OCR Processing sent in the background"}
def __process_request(invoice, mode, token, callback_url, res_width, check_before_ocr):
"""one job execution and returns result to URL
Parameters
----------
invoice : class
API input
mode : str
return as txt or pdfa
token : str
validation token
callback_url : str
url for result callback
res_width : int
width for image resizing
check_before_ocr : bool
checks if image is usable for OCR
"""
headers = { 'Authorization': 'Bearer ' + token }
try:
start = timer()
result = handle_ocr(invoice, mode, res_width, check_before_ocr)
time = timedelta(seconds=timer()-start) / timedelta(milliseconds=1)
requests.post(callback_url, headers=headers, json={'content': result.content, 'execTime': time})
except HTTPException as e:
print("exception catched", e.status_code, e.detail)
requests.post(callback_url, headers=headers, json={"errors": [{"statusCode": 500, "message": e.detail}]})