Vai al contenuto

Vademecum PySpark

Questi paragrafi vogliono cercare di condensare alcune metodologie/operazioni tra le più frequenti, ma possono non essere esaustivi nel livello di dettaglio e di possibilità.

Per le fonti ufficiali consultare la documentazione, per vedere altri esempi di codice, visitare il sito https://sparkbyexamples.com/.

Le Data Sources (i.e. tabelle) vengono caricate all'interno dei workflow come oggetti DataFrame (docs); su tali oggetti è possibile operare come fossero delle tabelle di un Database relazionale. A seguito una serie di snippet (non esaustivi) di codice riassuntivi delle operazioni base necessarie per operare su tali oggetti.

Note: nei seguenti esempi con l'abbreviazione df si farà riferimento ad un DataFrame (tabella) generico. Nei blocchi di codice, le linee che iniziano con il carattere > indicano le righe con i comandi, quelle senza gli output; se si copia il codice, rimuovere > e adattare l'indentazione.

Accesso alle colonne di un DataFrame

Prendiamo per esempio il DataFrame employees mostrato qui di seguito:

+---+------+-------+------+
| ID|  Name|Surname|Salary|
+---+------+-------+------+
|  1|  John|  Smith|   4.2|
|  2|Hannah|  Green|   4.2|
+---+------+-------+------+

Lavorando con delle tabelle, è fondamentale sapere come "accedere" alle colonne, ovvero come sono definiti i riferimenti ad esse.

  • Attributo dell'oggetto DataFrame [case sensitive]

    employees.ID

  • Accesso dict-like (i.e. nome colonna tra parentesi quadre) [case insensitive]

    employees["ID"]

  • Con funzione col [case insensitive]

    import pyspark.sql.functions as F
    F.col("ID")
    

Case sensitivity

Per i metodi case sensitive è necessario utilizzare il nome con maiuscole e minuscole che combacino con quelle attese; per esempio

Comando Correttezza
employees.ID ✔
employees.Id ❌
employees.Name ✔
employees.NAME ❌

Al contrario, per i metodi case insensitive la stringa può contenere capitalizzazione a piacere, quindi per esempio employees["surname"]e col("SALARY") sono entrambi frammenti di codice validi.

Select

L'istruzione df.select() (docs) serve a selezionare le colonne in un DataFrame. Come argomenti accetta una lista di colonne che diventeranno quelle presenti nel DataFrame.

Nota: per visualizzare un risultato, utilizzare l'istruzione .show() Esempio

> employees.select(employees.Salary,"Surname").show()
+------+-------+
|Salary|Surname|
+------+-------+
|   4.2|  Smith|
|   4.2|  Green|
+------+-------+

Per togliere i duplicati da un DataFrame è presente l'istruzione df.distinct() che elimina le linee doppie tenendone una sola istanza.

Where

L'istruzione df.where() (docs) permette di filtrare le righe di una tabella in base a delle condizioni. Le condizioni semplici possono essere combinate utilizzando gli operatori logici &(AND), |(OR) e le parentesi per dare la giusta priorità ai vari operatori.

Operazioni logiche

Uguale (==)

employees.Name == "John"
df.column_one == df.column_two

Diverso (!=)

employees.Name != "John" 
df.column_one != df.column_two

Confronto (>, <, >=, <=)

employees.Salary >= 2
df.column_three < df.column_four

NULL e NOT NULL (.isNull(), .isNotNull())

df.attributo_opzionale.isNull()
df.data_chiusura.isNotNull()

Between (.between(low,high))

 employees.Salary.between(1.5, 2.5)
df.data.between(datetime(2021,12,26), datetime(2021,12,31))

Per la definizione di date vedi paragrafo Utility

Lista di valori validi (.isin([...]))

employees.Surname.isin(["Smith","Williams", "Brown"])
df.tipologia_pagamento.isin([1, 2, 5])

Negazione (~)

~df.boolean_column

In questo caso se si ha una colonna nel DataFrame che contiene valori booleani, con l'operatore ~ è possibile negarne il valore ( quindi i valori True diventano False e viceversa).

Combinazione di condizioni

Nel caso più semplice si può inserire una condizione singola all'interno dell'istruzione .where() come nel seguente esempio:

> import pyspark.sql.functions as F
> employees.where(F.col("salary") < 2).show()
+---+------+-------+------+
| ID|  Name|Surname|Salary|
+---+------+-------+------+
+---+------+-------+------+

Per poter combinare le condizioni, è necessario racchiudere ciascuna condizione booleana tra parentesi tonde e utilizzare gli operatori & e | .

> import pyspark.sql.functions as F
> employees.where((F.col("Name") != "John") & (F.col("ID") > 0)).show()
+---+------+-------+------+
| ID|  Name|Surname|Salary|
+---+------+-------+------+
|  2|Hannah|  Green|   4.2|
+---+------+-------+------+

Join

L'operazione di .join()(docs) è quella che permette di unire due DataFrame specificando le condizioni sulle colonne delle due tabelle. Come tabelle di esempio verranno utilizzate le seguenti:

employees                     cars                      ownership
+---+------+-------+------+   +---+----------+-------+  +---------+------+
| ID|  Name|Surname|Salary|   | ID|      Casa|Modello|  |ID_person|ID_car|
+---+------+-------+------+   +---+----------+-------+  +---------+------+
|  1|  John|  Smith|   4.2|   | 10|      Fiat|  Punto|  |        1|    10|
|  2|Hannah|  Green|   4.2|   | 20|      Opel|  Corsa|  |        1|    20|
+---+------+-------+------+   | 30|Alfa Romeo| Giulia|  +---------+------+ 
                              +---+----------+-------+  

L'operazione join è un metodo di un oggetto DataFrame, come parametri si aspetta il secondo DataFrame con cui combinare i record, le condizioni da applicare e la modalità di join. df.join(other, on, how)

Esempio 1

> cars.join(ownership, cars.ID == ownership.ID_car, "inner") \
> .select("Casa","Modello") \
> .show()
+----+-------+
|Casa|Modello|
+----+-------+
|Fiat|  Punto|
|Opel|  Corsa|
+----+-------+

La query in esempio mostra le auto che sono state comprate da qualcuno. Il terzo parametro (how) è specificato con la stringa "inner", indicando il tipo di join che verrà applicato. In questo caso poteva essere omesso dato che il default value è, per l'appunto, "inner".

Esempio 2

> result = employees.join(ownership, employees.ID == ownership.ID_person, "left") \
> .join(cars, cars.ID == ownership.ID_car, "left")
> result.show()
+---+------+-------+------+---------+------+----+----+-------+
| ID|  Name|Surname|Salary|ID_person|ID_car|  ID|Casa|Modello|
+---+------+-------+------+---------+------+----+----+-------+
|  2|Hannah|  Green|   4.2|     null|  null|null|null|   null|
|  1|  John|  Smith|   4.2|        1|    10|  10|Fiat|  Punto|
|  1|  John|  Smith|   4.2|        1|    20|  20|Opel|  Corsa|
+---+------+-------+------+---------+------+----+----+-------+

La seguente query mostra come unire le informazioni delle persone con le relative auto possedute; da notare il fatto che è possibile ottenere delle colonne omonime all'interno del DataFrame risultante dei vari join. Il metodo più semplice per poter differenziare le colonne omonime, è attraverso il DataFrame di origine nel seguente modo:

> result.select("Name", cars.ID, "Modello").show()
+------+----+-------+
|  Name|  ID|Modello|
+------+----+-------+
|Hannah|null|   null|
|  John|  10|  Punto|
|  John|  20|  Corsa|
+------+----+-------+

Esempio 3

> ownership.join(employees, [employees.ID==ownership.ID_person, employees.Salary > 10]).show()
+---------+------+---+----+-------+------+
|ID_person|ID_car| ID|Name|Surname|Salary|
+---------+------+---+----+-------+------+
+---------+------+---+----+-------+------+

Per impostare condizioni multiple di join, si passa come secondo parametro una lista di condizioni booleane. Le condizioni, ovviamente, sono lo stesso tipo di oggetti menzionati nel paragrafo riguardante il Where Altri esempi: https://sparkbyexamples.com/spark/spark-sql-dataframe-join/

Group by

Per poter calcolare dei valori aggregati (come in SQL le operazioni SUM, COUNT, AVG, ecc.) sono presenti in modo simile i comandi anche in PySpark.

> aggr = employees.join(ownership, ownership.ID_person==employees.ID).groupBy(employees.ID)
> aggr.count().show()
+---+-----+
| ID|count|
+---+-----+
|  1|    2|
+---+-----+

Utilizzando l'operazione .groupBy(*cols) su un DataFrame, è possibile passare una lista di campi sui quali raggruppare e, a partire da quell'oggetto (e.g. aggr nell'esempio), è possibile computare i valori aggregati. Per computare più statistiche sullo stesso aggregato, consultare la documentazione. Per la lista completa dei metodi aggregati applicabili dopo un .groupBy(), consultare la relativa pagina.

Aggiungere/computare una nuova colonna

È possibile definire degli oggetti colonna e aggiungerli a DataFrame esistenti, come mostrato di seguito:

> import pyspark.sql.functions as F
> iniziali = F.concat(F.substring(employees.Name,1,1), F.substring(employees.Surname,1,1))
> emp_1 = employees.withColumn("Initials",iniziali)
> emp_1.show()
+---+------+-------+------+--------+
| ID|  Name|Surname|Salary|Initials|
+---+------+-------+------+--------+
|  1|  John|  Smith|   4.2|      JS|
|  2|Hannah|  Green|   4.2|      HG|
+---+------+-------+------+--------+

Nell'esempio viene dichiarata la colonna iniziali come funzione di colonne e viene aggiunta raccogliendo l'output della seconda riga: il comando .withColumn("name", column) permette di aggiungere la colonna al dataframe utilizzando il nome "name". Le colonne definite possono anche essere utilizzate nell'istruzione di .select().

Utility

Oggetti di tipo data in Python

Per poter confrontare colonne con date e timestamp è necessario utilizzare gli oggetti datetime.datetime (docs).

> from datetime import datetime
> date_1 = datetime(2021, 12, 31)
> print(date_1)
2021-12-31 00:00:00
> s = "20220125"
> date_2 = datetime.strptime(s,"%Y%m%d")
> print(date_2)
2022-01-25 00:00:00

Colonne con valore costanti

Per dichiarare una colonna con un valore costante viene utilizzato il metodo pyspark.sql.functions.lit(value).

> import pyspark.sql.functions as F
> column_stringa_vuota = F.lit("")
> column_zero = F.lit(0)

Cambiare nome ad una colonna

Per cambiare il nome ad una colonna è possibile farlo con:

  • Rinominando [df.withColumnRenamed("old","new")]
> renamed = employees.withColumnRenamed("ID","codice_impiegato")
> renamed.show()
+----------------+------+-------+------+
|codice_impiegato|  Name|Surname|Salary|
+----------------+------+-------+------+
|               1|  John|  Smith|   4.2|
|               2|Hannah|  Green|   4.2|
+----------------+------+-------+------+

Nota

Il DataFrame employees non contiene la modifica!

  • Dando un'alias [column.alias("name")]
> employees.select(employees.Name.alias("nome")).show()
> renamed.show()
+------+
|  nome|
+------+
|  John|
|Hannah|
+------+

Nota

Non viene cambiato il nome nel DataFrame employees, viene cambiato solo nell'operazione di visualizzazione.