Code: Alles auswählen
from multiprocessing import Pool
def f(x ):
return x**3
if __name__ == '__main__':
with Pool(2) as p:
print(p.map(f, [7, 2, 3]))
Code: Alles auswählen
from multiprocessing import Pool
def f(x ):
return x**3
if __name__ == '__main__':
with Pool(2) as p:
print(p.map(f, [7, 2, 3]))
Code: Alles auswählen
from multiprocessing import Process, Value, Array
import time
def add_100(number):
for _ in range(100):
time.sleep(0.01)
number.value += 1
def add_100_array(numbers):
for _ in range(30):
time.sleep(0.01)
for i in range(len(numbers)):
numbers[i] += 1
if __name__ == "__main__":
shared_number = Value('i', 0)
print('Beginn mit Werten:', shared_number.value)
shared_array = Array('d', [1.0, 3.0 ,6.0])
print('Array zu Beginn:', shared_array[:])
process1 = Process(target=add_100, args=(shared_number,))
process2 = Process(target=add_100, args=(shared_number,))
process3 = Process(target=add_100_array, args=(shared_array,))
process4 = Process(target=add_100_array, args=(shared_array,))
process1.start()
process2.start()
process3.start()
process4.start()
process1.join()
process2.join()
process3.join()
process4.join()
if __name__ == '__main__':
print('Letzter Wert:', shared_number.value)
print('Array am Ende:', shared_array[:])
Code: Alles auswählen
from functools import reduce
from multiprocessing import Pool, cpu_count
def square(x):
"""Function """
return x **3
if __name__ == "__main__":
# print the number of cores
# print("Number of cores available equals %s" % cpu_count())
# create a pool of workers
with Pool() as pool:
r = range(1, 5)
result = pool.map(square, r )
total = reduce(lambda x , y : x * y, result)
print("das Produkt der ersten 5 Zahlen = :%s" % total)
print("Hier die Anzahl der CPU-Kerne", cpu_count())
Code: Alles auswählen
rom multiprocessing import Process ,Pipe , current_process
from datetime import datetime
import time, random
dta1 = 1
def getSensorData(conn,dta1):
print("getsensorData gerufen")
ctr = 0
while ctr <10:
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
#print("time: ",timestamp)
time.sleep(0.15)
dta_1 = random.randint(0,99)
dta_2 = random.randint(0,99)
dta_3 = random.randint(0,99)
conn.send([timestamp, dta_1,dta_2, dta_3])
ctr += 1
def Control_Proc():
p = current_process()
print('Starting:', p.name, p.pid)
parent_conn, child_conn = Pipe()
ptemp = Process(name="getSensorData", target=getSensorData,\
args=(child_conn,dta1))
ptemp.daemon = True
ptemp.start()
while(True):
while parent_conn.poll():
timestamp, data_01,data_02, data_03 \
= parent_conn.recv()
print(timestamp, " data01: ",data_01, "data_02: "\
,data_02, "data_03: ",data_03)
time.sleep(1 )
if __name__ == '__main__':
Control_Proc()
Code: Alles auswählen
import time
import random
from multiprocessing import Process, Queue, current_process, freeze_support
#
# Function run by worker processes
#
def worker(input, output):
for func, args in iter(input.get, 'STOP'):
result = calculate(func, args)
output.put(result)
def calculate(func, args):
result = func(*args)
return '%s bedeutet, dass %s%s = %s' % \
(current_process().name, func.__name__, args, result)
#
# Functions referenced by tasks
#
def mul(a, b):
time.sleep(0.5*random.random())
return a * b
def plus(a, b):
time.sleep(0.5*random.random())
return a + b
def potenz(a,b):
time.sleep(0.5*random.random())
return a **b
#
def test():
NUMBER_OF_PROCESSES = 4
TASKS1 = [(mul, (i, 7)) for i in range(20)]
TASKS2 = [(plus, (i, 8)) for i in range(10)]
TASKS3 = [(potenz,(i,9))for i in range(5)]
# Create queues
task_queue = Queue()
done_queue = Queue()
# Submit tasks
for task in TASKS1:
task_queue.put(task)
# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
Process(target=worker, args=(task_queue, done_queue)).start()
# Get and print results
print('Hier die ungeordneten Ergebnisse von X*x, sowie X+X' )
print('Das Beispiel zeigt, wie queues verwendet werden, um Aufgaben an \
diverse Arbeitsprozesse weiterzuleiten und die Ergebnisse zu sammeln:')
for i in range(len(TASKS1)):
print('\t', done_queue.get())
# Add more tasks using `put()`
for task in TASKS2:
task_queue.put(task)
# Get and print some more results
for i in range(len(TASKS2)):
print('\t', done_queue.get())
for task in TASKS3:
task_queue.put(task)
for i in range(len(TASKS3)):
print('\t', done_queue.get())
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
if __name__ == '__main__':
freeze_support()
test()
Code: Alles auswählen
from time import time
import multiprocessing as mp
from multiprocessing.pool import ThreadPool
import numpy as np
import pickle
def main():
arr = np.ones((1024, 1024, 1024), dtype=np.uint8)
expected_sum = np.sum(arr)
with ThreadPool(1) as threadpool:
start = time()
assert (
threadpool.apply(np.sum, (arr,)) == expected_sum
)
print("Thread pool:", time() - start)
with mp.get_context("spawn").Pool(1) as processpool:
start = time()
assert (
processpool.apply(np.sum, (arr,))
== expected_sum
)
print("Process pool:", time() - start)
if __name__ == "__main__":
main()
Code: Alles auswählen
import time
import random
from scipy import integrate
import numpy as np
from multiprocessing import Process, Queue, current_process, freeze_support
from scipy import integrate
def worker(input, output):
for func, args in iter(input.get, 'STOP'):
result = calculate(func, args)
output.put(result)
def calculate(func, args):
#result = func(*args)
return '%s bedeutet, dass %s%s = %s' % \
(current_process().name, func.__name__, args, result)
#
# Functions referenced by tasks
#
def bogen(a, b):
time.sleep(0.1)
return lambda x: x**3 +2*x**2 -4
print ("Numerische Integration mit Quadratur von Gauß")
f = lambda x: x**3 +2*x**2 -4
print(integrate.quadrature(f, 0.0, 1.0))
print(1/9.0) # Gewichtsfunktion 1/(1-x**2) **1/2
print(integrate.quadrature(np.sin, 0.0, np.pi/2))
print("Die Bogenlänge = ",np.sin(np.pi/2)-np.sin(1) ) # analytical result
def test():
NUMBER_OF_PROCESSES = 1
TASKS1 = [(bogen, (i,1)) for i in range(2)]
# Create queues
task_queue = Queue()mport time
import random
from scipy import integrate
import numpy as np
from multiprocessing import Process, Queue, current_process, freeze_support
from scipy import integrate
def worker(input, output):
for func, args in iter(input.get, 'STOP'):
result = calculate(func, args)
output.put(result)
def calculate(func, args):
#result = func(*args)
return '%s bedeutet, dass %s%s = %s' % \
(current_process().name, func.__name__, args, result)
#
# Functions referenced by tasks
#
def bogen(a, b):
time.sleep(0.1)
return lambda x: x**3 +2*x**2 -4
print ("Numerische Integration mit Quadratur von Gauß")
f = lambda x: x**3 +2*x**2 -4
print(integrate.quadrature(f, 0.0, 1.0))
print(1/9.0) # Gewichtsfunktion 1/(1-x**2) **1/2
print(integrate.quadrature(np.sin, 0.0, np.pi/2))
print("Die Bogenlänge = ",np.sin(np.pi/2)-np.sin(1) ) # analytical result
def test():
NUMBER_OF_PROCESSES = 1
TASKS1 = [(bogen, (i,1)) for i in range(2)]
# Create queues
task_queue = Queue()
done_queue = Queue()
# Submit tasks
for task in TASKS1:
task_queue.put(task)
# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
Process(target=worker, args=(task_queue, done_queue)).start()
# Get and print results
print(' -Bogenlänge- ist hier der einzige Arbeitsprozess')
print(' der natürlich erweitert werden kann. ')
for i in range(len(TASKS1)):
print('\t', done_queue.get())
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
if __name__ == '__main__':
freeze_support()
test()
done_queue = Queue()
# Submit tasks
for task in TASKS1:
task_queue.put(task)
# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
Process(target=worker, args=(task_queue, done_queue)).start()
# Get and print results
print(' -Bogenlänge- ist hier der einzige Arbeitsprozess')
print(' der natürlich erweitert werden kann. ')
for i in range(len(TASKS1)):
print('\t', done_queue.get())
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
if __name__ == '__main__':
freeze_support()
test()
Code: Alles auswählen
mport time
import random
from multiprocessing import Process, Queue, current_process, freeze_support
#
# Function run by worker processes
#
def worker(input, output):
for func, args in iter(input.get, 'STOP'):
result = calculate(func, args)
output.put(result)
#
# Function used to calculate result
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % \
(current_process().name, func.__name__, args, result)
def mul(a, b):
time.sleep(0.5*random.random())
return a * b
def plus(a, b):
time.sleep(0.5*random.random())
return a + b
def plotten(x,y):
time.sleep(0.5*random.random())
return
import matplotlib.pyplot as plot
import numpy
import math
from math import *
x = numpy.empty(300)
y = numpy.empty(300)
for i in range(300):
x[i] = (i - 1) * 0.02
y[i] = math.sin(x[i])
plot.plot(x, y)
plot.xlabel('x')
plot.ylabel('x')
plot.title('FUNKTIONSPLOT im MULTIPROZESS t')
plot.grid(True)
plot.savefig("plot.png")
plot.show()
#################################
def test():
NUMBER_OF_PROCESSES =3
TASKS1 = [(mul, (i, 7)) for i in range(5)]
TASKS2 = [(plus, (i, 8)) for i in range(5)]
TASKS3= [(plotten (i, 1)) for i in range(1)]
# Create queues
task_queue = Queue()
done_queue = Queue()
# Submit tasks
for task in TASKS1:
task_queue.put(task)
# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
Process(target=worker, args=(task_queue, done_queue)).start()
# Get and print results
print('Hier die restlichen Ergebnisse:')
for i in range(len(TASKS3)):
print('\t', done_queue.get())
# Add more tasks using `put()`
for task in TASKS2:
task_queue.put(task)
# Get and print some more results
for i in range(len(TASKS2)):
print('\t', done_queue.get())
for i in range(len(TASKS3)):
print('\t', done_queue.get())
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
if __name__ == '__main__':
freeze_support()
test()
Code: Alles auswählen
import plotly.graph_objects as go
import numpy as np
feature_x = np.arange(0, 50, 2)
feature_y = np.arange(0, 50, 3)
# Creating 2-D grid of features
[X, Y] = np.meshgrid(feature_x, feature_y)
Z = (X / 3) + (Y / 4)
# plotting the figure
fig = go.Figure(data =
go.Heatmap(x = feature_x, y = feature_y, z = Z,))
fig.show()
Code: Alles auswählen
##################
import plotly.graph_objects as px
import numpy as np
# creating random data through randomint
# function of numpy.random
np.random.seed(42)
# Data to be Plotted
random_x = np.random.randint(1, 101, 100)
random_y = np.random.randint(1, 101, 100)
plot = px.Figure(data=[px.Scatter(
x=random_x,
y=random_y,
mode='markers',)
])
# Add dropdown
plot.update_layout(
updatemenus=[
dict(
buttons=list([
dict(
args=["type", "scatter"],
label="Scatter Plot",
method="restyle"
),
dict(
args=["type", "bar"],
label="Bar Chart",
method="restyle"
)
]),
direction="down",
),
]
)
plot.show()
Code: Alles auswählen
########################## Low-Code Python
import plotly.graph_objects as go
import numpy as np
# Data to be plotted
x = np.outer(np.linspace(-2, 2, 30), np.ones(30))
y = x.copy().T
z = np.cos(x ** 2 + y ** 2)
# plotting the figure
fig = go.Figure(data=[go.Surface(x=x, y=y, z=z)])
fig.show()
#######################
###################
Code: Alles auswählen
for x in range(1, 6):
print(repr(x).rjust(2), repr(x*x).rjust(3), end=' ')
# Achte auf die Benutzung von 'end' in der vorherigen Zeile
print(repr(x*x*x).rjust(4))
print()
for x in range(1, 6):
print('{0:2d} {1:3d} {2:4d}'.format(x, x*x, x*x*x))
print()
for x in range(1, 11):
print(repr(x).rjust(2), repr(x*x).rjust(3), end=' ')
# Achte auf die Benutzung von 'end' in der vorherigen Zeile
print(repr(x*x*x).rjust(5))
print()
for x in range(1, 11):
print('{1:2d} {2:3d} {3:5d}'.format(x, x*x, x*x*x,x*x*x*x))
[code]
Code: Alles auswählen
#Beispiel mit F-String
language = " etwa ab Python3.7 "
school = "F-String"
print(f" jetzt kommt {language} auch noch from {school} dazu .")
print()
Version = '3.11.4'
Anmerkung ='das ist OK'
print(f' mein Python hat die Version {Version} {Anmerkung}')
Code: Alles auswählen
import math
goldenerSchnitt = (1+math.sqrt(5))/2
print(f"{goldenerSchnitt=}")
print(f"{goldenerSchnitt = }")
print(f"{goldenerSchnitt = :.6f}")
print(f"{(1+math.sqrt(5))/2 = :.6f}")
#####################
APPLES = .50
BREAD = 1.50
CHEESE = 2.25
numApples = 3
numBread = 10
numCheese = 6
prcApples = numApples * APPLES
prcBread = numBread* BREAD
prcCheese = numCheese * CHEESE
strApples = 'Apples'
strBread = 'Rye Bread'
strCheese = 'Cheese'
total = prcBread + prcCheese + prcApples
print(f'{"My Grocery List":^30s}')
print(f'{"="*30}')
print(f'{strApples:10s}{numApples:10d}\t${prcApples:>5.2f}')
print(f'{strBread:10s}{numBread:10d}\t${prcBread:>5.2f}')
print(f'{strCheese:10s}{numCheese:10d}\t${prcCheese:>5.2f}')
print(f'{"Total:":>20s}\t${total:>5.2f}')
Code: Alles auswählen
#!/usr/bin/python
#Ein Beispiel mit den derzeitigen Formenn der Formatierung von Strings und Zahlen
Name = 'Max'
Alter = 28
print('%s ist %d Jahre alt' % (Name, Alter))
#Dies ist die älteste Option.
print()
print('{} ist {} Jahre alt'.format(Name, Alter))
#Seit Python 3.0 format wurde die Funktion eingeführt
print()
print(f'{Name} ist {Alter}Jahre alt')
#Pythons F-Strings .
Code: Alles auswählen
wieoft = {"und": 100, "oder": 87, "nicht": 31,"weniger": 15}
for word in wieoft:
output = "Das Wort '" + word + "' kommt " + str(wieoft[word]) + "-mal vor ."
print(output)
print()
####################
langer_text = "Ich hab', ich hab', ich hab', ich hab', ich hab', zu wenig Platz"
print(len(langer_text))
print("-->{:50}<--".format(langer_text))
##Per Default wird der einzusetzende Ausdruck innerhalb seines Slots links ausgerichtet.
#Wir können explizit andere Ausrichtungen erzwingen:
print("-->{:<40}<--".format("links"))
print("---")
print("-->{:>40}<--".format("rechts"))
print("---")
print("-->{:^40}<--".format("zentriert"))
Code: Alles auswählen
# "Wann war was ?
import datetime
now = datetime.datetime.now()
tage_vorher= now - datetime.timedelta(days=7) #beliebig
print(f'{tage_vorher:%d-%m-%Y-%H %M:%S}')
#''2020-10-13 20:24:17'
print(' Tage vorher , nachher und gleiche Tageszeit')
print(f'{now:%d-%m-%Y- %H:%M:%S}')
#'2020-10-23 20:24:17'
#######################
# Zahlensysteme konvertieren mit F-Strings
bases = {
"b": "bin",
"o": "oct",
"x": "hex",
"X": "HEX",
"d": "decimal"
}
print("' bin' ,'oct ' ,'hex ' ,'HEX ', 'dec'")
for n in range(1, 21):
for base, desc in bases.items():
print(f"{n:10{base}}", end=' ')
print()
print("' bin' ,'oct ' ,'hex ' ,'HEX ', 'dec'")
Code: Alles auswählen
import numpy as np
from numpy import *
import scipy.integrate as integrate
from scipy.integrate import quad
num1 = 4
num2 = log(2)
print(f"Potenz von {num1} und {num2} = {num1 ** num2}.")
print(f"Produkt von {num1} und {num2} = {num1 * num2}.")
print(f"Summe von {num1} und {num2} = {num1 + num2}.")
print(f"Differenz von {num1} und {num2} = {num1 - num2}.")
print(f"Quotient von {num1} und {num2} = {num1/ num2}.")
################################
def integrand(x, a, b):
return a*x**4 + b
x=1
a= log(e)
b = exp(- 0.2)
I = quad(integrand, 0, 1, args=(a,b))
print( f "Bestimmtes Integral von (a*x**2 **b) = {I} ")
Code: Alles auswählen
# Using format() method
format_method = timeit.timeit("""
char = 'Iron man'
name = 'Stan Lee'
'{} was created by {}.'.format(char, name)
""", number=1000000)
print(format_method)
# Using f-string
fst = timeit.timeit("""
char = 'Iron man'
name = 'Stan Lee'
f'{char} was created by {name}.'
""", number=1000000)
print(fst)
# Using Template Class
tcl = timeit.timeit("""
from string import Template
char = 'Iron man'
name = 'Stan Lee'
my_str = Template("$x was created by $y.")
my_str.substitute(x=char, y=name)""", number=1000000)
print(tcl)
Code: Alles auswählen
def arithmetic_progression(n , limit):
return list(range(n, limit +1 , n))
print(arithmetic_progression(3 ,10))
###############################
#Wie normalisiert man ein NumPy-Array auf einen Einheitsvektor?
#Um einen NumPy-Array auf einen Einheitsvektor zu normalisieren,
#kann man die numpy.linalg.norm Funktion verwenden,
#die Größe des Vektors berechnen, und den Array anschließend durch diese Größe teilen.
# Dafür hier ein Beispielcode-snippet (Schnipsel)
import numpy as np
# ARRAY initialisieren
arr = np.array([1, 2, 3])
# Berechne dei Größe des Vektors
magnitude = np.linalg.norm(arr)
# Normalisierung mit Teiling des Arrays durch die Größe
unit_vec = arr / magnitude
print(unit_vec)