Vademecum PySpark
Questi paragrafi vogliono cercare di condensare alcune metodologie/operazioni tra le più frequenti, ma possono non essere esaustivi nel livello di dettaglio e di possibilità.
Per le fonti ufficiali consultare la documentazione, per vedere altri esempi di codice, visitare il sito https://sparkbyexamples.com/.
Le Data Sources (i.e. tabelle) vengono caricate all'interno dei workflow come oggetti DataFrame
(docs); su tali oggetti è possibile operare come fossero delle tabelle di un Database relazionale.
A seguito una serie di snippet (non esaustivi) di codice riassuntivi delle operazioni base necessarie per operare su tali oggetti.
Note: nei seguenti esempi con l'abbreviazione df
si farà riferimento ad un DataFrame (tabella) generico.
Nei blocchi di codice, le linee che iniziano con il carattere >
indicano le righe con i comandi, quelle senza gli output; se si copia il codice, rimuovere >
e adattare l'indentazione.
Accesso alle colonne di un DataFrame
Prendiamo per esempio il DataFrame employees
mostrato qui di seguito:
+---+------+-------+------+
| ID| Name|Surname|Salary|
+---+------+-------+------+
| 1| John| Smith| 4.2|
| 2|Hannah| Green| 4.2|
+---+------+-------+------+
Lavorando con delle tabelle, è fondamentale sapere come "accedere" alle colonne, ovvero come sono definiti i riferimenti ad esse.
-
Attributo dell'oggetto DataFrame [case sensitive]
employees.ID
-
Accesso
dict
-like (i.e. nome colonna tra parentesi quadre) [case insensitive]employees["ID"]
-
Con funzione
col
[case insensitive]
Case sensitivity
Per i metodi case sensitive è necessario utilizzare il nome con maiuscole e minuscole che combacino con quelle attese; per esempio
Comando | Correttezza |
---|---|
employees.ID |
|
employees.Id |
|
employees.Name |
|
employees.NAME |
Al contrario, per i metodi case insensitive la stringa può contenere capitalizzazione a piacere, quindi per esempio employees["surname"]
e col("SALARY")
sono entrambi frammenti di codice validi.
Select
L'istruzione df.select()
(docs) serve a selezionare le colonne in un DataFrame. Come argomenti accetta una lista di colonne che diventeranno quelle presenti nel DataFrame.
Nota: per visualizzare un risultato, utilizzare l'istruzione .show()
Esempio
> employees.select(employees.Salary,"Surname").show()
+------+-------+
|Salary|Surname|
+------+-------+
| 4.2| Smith|
| 4.2| Green|
+------+-------+
Per togliere i duplicati da un DataFrame è presente l'istruzione df.distinct()
che elimina le linee doppie tenendone una sola istanza.
Where
L'istruzione df.where()
(docs) permette di filtrare le righe di una tabella in base a delle condizioni. Le condizioni semplici possono essere combinate utilizzando gli operatori logici &
(AND), |
(OR) e le parentesi per dare la giusta priorità ai vari operatori.
Operazioni logiche
Uguale (==
)
Diverso (!=
)
Confronto (>, <, >=, <=
)
NULL e NOT NULL (.isNull(), .isNotNull()
)
Between (.between(low,high)
)
Per la definizione di date vedi paragrafo Utility
Lista di valori validi (.isin([...])
)
Negazione (~
)
In questo caso se si ha una colonna nel DataFrame che contiene valori booleani, con l'operatore ~
è possibile negarne il valore ( quindi i valori True
diventano False
e viceversa).
Combinazione di condizioni
Nel caso più semplice si può inserire una condizione singola all'interno dell'istruzione .where()
come nel seguente esempio:
> import pyspark.sql.functions as F
> employees.where(F.col("salary") < 2).show()
+---+------+-------+------+
| ID| Name|Surname|Salary|
+---+------+-------+------+
+---+------+-------+------+
Per poter combinare le condizioni, è necessario racchiudere ciascuna condizione booleana tra parentesi tonde e utilizzare gli operatori &
e |
.
> import pyspark.sql.functions as F
> employees.where((F.col("Name") != "John") & (F.col("ID") > 0)).show()
+---+------+-------+------+
| ID| Name|Surname|Salary|
+---+------+-------+------+
| 2|Hannah| Green| 4.2|
+---+------+-------+------+
Join
L'operazione di .join()
(docs) è quella che permette di unire due DataFrame specificando le condizioni sulle colonne delle due tabelle. Come tabelle di esempio verranno utilizzate le seguenti:
employees cars ownership
+---+------+-------+------+ +---+----------+-------+ +---------+------+
| ID| Name|Surname|Salary| | ID| Casa|Modello| |ID_person|ID_car|
+---+------+-------+------+ +---+----------+-------+ +---------+------+
| 1| John| Smith| 4.2| | 10| Fiat| Punto| | 1| 10|
| 2|Hannah| Green| 4.2| | 20| Opel| Corsa| | 1| 20|
+---+------+-------+------+ | 30|Alfa Romeo| Giulia| +---------+------+
+---+----------+-------+
L'operazione join è un metodo di un oggetto DataFrame, come parametri si aspetta il secondo DataFrame con cui combinare i record, le condizioni da applicare e la modalità di join.
df.join(other, on, how)
Esempio 1
> cars.join(ownership, cars.ID == ownership.ID_car, "inner") \
> .select("Casa","Modello") \
> .show()
+----+-------+
|Casa|Modello|
+----+-------+
|Fiat| Punto|
|Opel| Corsa|
+----+-------+
La query in esempio mostra le auto che sono state comprate da qualcuno. Il terzo parametro (how) è specificato con la stringa "inner"
, indicando il tipo di join che verrà applicato. In questo caso poteva essere omesso dato che il default value è, per l'appunto, "inner"
.
Esempio 2
> result = employees.join(ownership, employees.ID == ownership.ID_person, "left") \
> .join(cars, cars.ID == ownership.ID_car, "left")
> result.show()
+---+------+-------+------+---------+------+----+----+-------+
| ID| Name|Surname|Salary|ID_person|ID_car| ID|Casa|Modello|
+---+------+-------+------+---------+------+----+----+-------+
| 2|Hannah| Green| 4.2| null| null|null|null| null|
| 1| John| Smith| 4.2| 1| 10| 10|Fiat| Punto|
| 1| John| Smith| 4.2| 1| 20| 20|Opel| Corsa|
+---+------+-------+------+---------+------+----+----+-------+
La seguente query mostra come unire le informazioni delle persone con le relative auto possedute; da notare il fatto che è possibile ottenere delle colonne omonime all'interno del DataFrame risultante dei vari join. Il metodo più semplice per poter differenziare le colonne omonime, è attraverso il DataFrame di origine nel seguente modo:
> result.select("Name", cars.ID, "Modello").show()
+------+----+-------+
| Name| ID|Modello|
+------+----+-------+
|Hannah|null| null|
| John| 10| Punto|
| John| 20| Corsa|
+------+----+-------+

 Esempio 3
> ownership.join(employees, [employees.ID==ownership.ID_person, employees.Salary > 10]).show()
+---------+------+---+----+-------+------+
|ID_person|ID_car| ID|Name|Surname|Salary|
+---------+------+---+----+-------+------+
+---------+------+---+----+-------+------+
Per impostare condizioni multiple di join, si passa come secondo parametro una lista di condizioni booleane. Le condizioni, ovviamente, sono lo stesso tipo di oggetti menzionati nel paragrafo riguardante il Where 
 Altri esempi: https://sparkbyexamples.com/spark/spark-sql-dataframe-join/
Group by
Per poter calcolare dei valori aggregati (come in SQL le operazioni SUM
, COUNT
, AVG
, ecc.) sono presenti in modo simile i comandi anche in PySpark.
> aggr = employees.join(ownership, ownership.ID_person==employees.ID).groupBy(employees.ID)
> aggr.count().show()
+---+-----+
| ID|count|
+---+-----+
| 1| 2|
+---+-----+
Utilizzando l'operazione .groupBy(*cols)
su un DataFrame, è possibile passare una lista di campi sui quali raggruppare e, a partire da quell'oggetto (e.g. aggr
nell'esempio), è possibile computare i valori aggregati.
Per computare più statistiche sullo stesso aggregato, consultare la documentazione.
Per la lista completa dei metodi aggregati applicabili dopo un .groupBy()
, consultare la relativa pagina.
Aggiungere/computare una nuova colonna
È possibile definire degli oggetti colonna e aggiungerli a DataFrame esistenti, come mostrato di seguito:
> import pyspark.sql.functions as F
> iniziali = F.concat(F.substring(employees.Name,1,1), F.substring(employees.Surname,1,1))
> emp_1 = employees.withColumn("Initials",iniziali)
> emp_1.show()
+---+------+-------+------+--------+
| ID| Name|Surname|Salary|Initials|
+---+------+-------+------+--------+
| 1| John| Smith| 4.2| JS|
| 2|Hannah| Green| 4.2| HG|
+---+------+-------+------+--------+
Nell'esempio viene dichiarata la colonna iniziali
come funzione di colonne e viene aggiunta raccogliendo l'output della seconda riga: il comando .withColumn("name", column)
permette di aggiungere la colonna al dataframe utilizzando il nome "name"
.
Le colonne definite possono anche essere utilizzate nell'istruzione di .select()
.
Utility
Oggetti di tipo data in Python
Per poter confrontare colonne con date e timestamp è necessario utilizzare gli oggetti datetime.datetime
(docs).
> from datetime import datetime
> date_1 = datetime(2021, 12, 31)
> print(date_1)
2021-12-31 00:00:00
> s = "20220125"
> date_2 = datetime.strptime(s,"%Y%m%d")
> print(date_2)
2022-01-25 00:00:00
Colonne con valore costanti
Per dichiarare una colonna con un valore costante viene utilizzato il metodo pyspark.sql.functions.lit(value)
.
Cambiare nome ad una colonna
Per cambiare il nome ad una colonna è possibile farlo con:
- Rinominando [
df.withColumnRenamed("old","new")
]
> renamed = employees.withColumnRenamed("ID","codice_impiegato")
> renamed.show()
+----------------+------+-------+------+
|codice_impiegato| Name|Surname|Salary|
+----------------+------+-------+------+
| 1| John| Smith| 4.2|
| 2|Hannah| Green| 4.2|
+----------------+------+-------+------+
Nota
Il DataFrame employees
non contiene la modifica!
- Dando un'alias [
column.alias("name")
]
> employees.select(employees.Name.alias("nome")).show()
> renamed.show()
+------+
| nome|
+------+
| John|
|Hannah|
+------+
Nota
Non viene cambiato il nome nel DataFrame employees
, viene cambiato solo nell'operazione di visualizzazione.