1. initilaztion
1. spark session
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("name") \
.config("spark.some.config.option", "some-value") \
2. spark context
1. from spark session
sc = spark.sparkContext
2. from spark context
conf = SparkConf().setAppName("KMeans").setMaster("local[*]")
sc = SparkContext(conf =conf)
sc = SparkContext.getOrCreate(conf)
2. paritition by index
.mapPartitionsWithIndex(lambda idx, it: islice(it, 1, None) if idx == 0 else it)\
this get rid of the first line(rdd) in the file.
3. opeartions of spark dataframe
1. read csv file into dataframe, with separator
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
df_demand = spark.read\
.option("header", "true")\
.option("inferSchema", "true")\
.option("mode", "DROPMALFORMED")\
.option("delimiter", "|")\
2. Basic Operations
3. read dictionary into spark dataframe
- dict -> pandas.df -> spark.df
https://stackoverflow.com/questions/43751509/how-to-create-new-dataframe-with-dict?rq=1 - dict -> spark.rdd (json rdd) -> json parser -> spark.df
when use pandas df to create spark df, if you want to perserve indices,
use: spark_df = spark.createDataFrame(pandasdf.reset_index(drop=False))
must have reset_index(drop=False) to preserve indices, as spark SQL has no concept of index
3. Unique value of a column
select(“col name”).distinct()
.drop(“col name”)
4. drop col
.drop(“col name”)
5. spark df to pandas df
.toPandas() # cost is expensive, try to minimum use it! it’s an action, collect all the data to the driver
6.sort by col, (descending)
from pyspark.sql.functions import desc
>= 10”)
.orderBy(‘Year’, ascending=False)
7. filter
In pyspark, filter on dataframe doesn’t take functions that returns a boolean,
it only takes SQL experssion that returns a boolean
If you want it to take a boolean function, use udf, sample:
def my_filter(col):
return Boolean
Make sure includes:
from pyspark.sql import functions as F
from pyspark.sql.types import BooleanType
8. map on column
df.withColumn(‘new column’, udf(‘old column’)))
9. coalesce and repartition
repartition(n) = coalesce(n, shuffle = true)
4. Wide dependency and narrow dependency,
Orgin we have M partitions, we repartiton to N partitions.
narrow dependency, usually we have M > N, like Map, doesn’t triger shuffle
when M is greatly larger than N, it’s better to manually triger shuffle, otherwise
it affects parallelization and performance.
Wide dependency, usually we have M < N, like Join, groupByKey, triger shuffle
5. Inferschema dependency: