Las industrias están utilizando ampliamente Hadoop para analizar sus conjuntos de datos. La razón es que el marco Hadoop se basa en un modelo de programación simple (MapReduce) y permite una solución informática que es escalable, flexible, tolerante a fallas y rentable. Aquí, la principal preocupación es mantener la velocidad en el procesamiento de grandes conjuntos de datos en términos de tiempo de espera entre consultas y tiempo de espera para ejecutar el programa.
Spark fue presentado por Apache Software Foundation para acelerar el proceso del software de computación computacional Hadoop.
A diferencia de la creencia común, Spark no es una versión modificada de Hadoop y, en realidad, no depende de Hadoop porque tiene su propia administración de clústeres. Hadoop es solo una de las formas de implementar Spark.
Spark usa Hadoop de dos maneras: una es almacenamiento y la segunda es procesamiento . Dado que Spark tiene su propio cálculo de gestión de clústeres, utiliza Hadoop solo con fines de almacenamiento.
Tabla de contenido
- Spark apache
- Evolución de Apache Spark
- Características de Apache Spark
- Spark Construido en Hadoop
- Componentes de Spark
- Núcleo Apache Spark
- Spark SQL
- Transmisión de Spark
- MLlib (Biblioteca de aprendizaje automático)
- Apache Spark – RDD
- Apache Spark – Instalación
- Apache Spark – Programación básica
- Apache Spark – Implementación
- Programación avanzada de Sparks
Spark apache
Apache Spark es una tecnología de cómputo en clúster ultrarrápida, diseñada para un cómputo rápido. Se basa en Hadoop MapReduce y amplía el modelo MapReduce para usarlo de manera eficiente para más tipos de cálculos, que incluyen consultas interactivas y procesamiento de flujo. La característica principal de Spark es su computación en clúster en memoria que aumenta la velocidad de procesamiento de una aplicación.
Spark está diseñado para cubrir una amplia gama de cargas de trabajo, como aplicaciones por lotes, algoritmos iterativos, consultas interactivas y transmisión. Además de admitir todas estas cargas de trabajo en un sistema respectivo, reduce la carga de gestión de mantener herramientas separadas.
Evolución de Apache Spark
Spark es uno de los subproyectos de Hadoop desarrollado en 2009 en AMPLab de UC Berkeley por Matei Zaharia. Fue Open Sourced en 2010 bajo una licencia BSD. Fue donado a la fundación de software Apache en 2013, y ahora Apache Spark se ha convertido en un proyecto Apache de primer nivel desde febrero de 2014.
Características de Apache Spark
Apache Spark tiene las siguientes características.
- Velocidad : Spark ayuda a ejecutar una aplicación en el clúster de Hadoop, hasta 100 veces más rápido en la memoria y 10 veces más rápido cuando se ejecuta en el disco. Esto es posible al reducir el número de operaciones de lectura/escritura en el disco. Almacena los datos de procesamiento intermedio en la memoria.
- Admite varios idiomas : Spark proporciona API integradas en Java, Scala o Python. Por lo tanto, puede escribir aplicaciones en diferentes idiomas. Spark presenta 80 operadores de alto nivel para consultas interactivas.
- Análisis avanzado : Spark no solo es compatible con ‘Mapear’ y ‘reducir’. También admite consultas SQL, transmisión de datos, aprendizaje automático (ML) y algoritmos gráficos.
Spark Construido en Hadoop
El siguiente diagrama muestra tres formas de construir Spark con componentes de Hadoop.
Hay tres formas de implementación de Spark, como se explica a continuación.
- Independiente : la implementación independiente de Spark significa que Spark ocupa el lugar en la parte superior de HDFS (Sistema de archivos distribuido de Hadoop) y el espacio se asigna para HDFS, explícitamente. Aquí, Spark y MapReduce se ejecutarán en paralelo para cubrir todos los trabajos de Spark en el clúster.
- Hadoop Yarn : la implementación de Hadoop Yarn significa, simplemente, que Spark se ejecuta en Yarn sin necesidad de preinstalación ni acceso a la raíz. Ayuda a integrar Spark en el ecosistema de Hadoop o en la pila de Hadoop. Permite que otros componentes se ejecuten encima de la pila.
- Spark en MapReduce (SIMR) : Spark en MapReduce se usa para iniciar un trabajo de Spark además de la implementación independiente. Con SIMR, el usuario puede iniciar Spark y usar su shell sin ningún acceso administrativo.
Componentes de Spark
La siguiente ilustración muestra los diferentes componentes de Spark.
Núcleo Apache Spark
Spark Core es el motor de ejecución general subyacente para la plataforma Spark en el que se basan todas las demás funciones. Proporciona conjuntos de datos de computación y referencia en memoria en sistemas de almacenamiento externo.
Spark SQL
Spark SQL es un componente sobre Spark Core que presenta una nueva abstracción de datos llamada SchemaRDD, que brinda soporte para datos estructurados y semiestructurados.
Transmisión de Spark
Spark Streaming aprovecha la capacidad de programación rápida de Spark Core para realizar análisis de transmisión. Ingiere datos en mini lotes y realiza transformaciones RDD (Conjuntos de datos distribuidos resistentes) en esos mini lotes de datos.
MLlib (Biblioteca de aprendizaje automático)
MLlib es un marco de aprendizaje automático distribuido por encima de Spark debido a la arquitectura Spark basada en memoria distribuida. Es, según los puntos de referencia, realizado por los desarrolladores de MLlib contra las implementaciones de Alternating Least Squares (ALS). Spark MLlib es nueve veces más rápido que la versión basada en disco Hadoop de Apache Mahout (antes de que Mahout obtuviera una interfaz Spark).
GráficoX
GraphX es un marco de procesamiento de gráficos distribuido sobre Spark. Proporciona una API para expresar el cálculo de gráficos que puede modelar los gráficos definidos por el usuario mediante el uso de la API de abstracción de Pregel. También proporciona un tiempo de ejecución optimizado para esta abstracción.
Apache Spark – RDD
Conjuntos de datos distribuidos resistentes
Los conjuntos de datos distribuidos resistentes (RDD) son una estructura de datos fundamental de Spark. Es una colección distribuida inmutable de objetos. Cada conjunto de datos en RDD se divide en particiones lógicas, que se pueden calcular en diferentes nodos del clúster. Los RDD pueden contener cualquier tipo de objetos de Python, Java o Scala, incluidas las clases definidas por el usuario.
Formalmente, un RDD es una colección de registros particionados de solo lectura. Los RDD se pueden crear a través de operaciones deterministas en datos en almacenamiento estable u otros RDD. RDD es una colección tolerante a fallas de elementos que se pueden operar en paralelo.
Hay dos formas de crear RDD: paralelizar una colección existente en su programa de controlador o hacer referencia a un conjunto de datos en un sistema de almacenamiento externo, como un sistema de archivos compartido, HDFS, HBase o cualquier fuente de datos que ofrezca un formato de entrada de Hadoop.
Spark hace uso del concepto de RDD para lograr operaciones de MapReduce más rápidas y eficientes. Analicemos primero cómo se llevan a cabo las operaciones de MapReduce y por qué no son tan eficientes.
El intercambio de datos es lento en MapReduce
MapReduce se adopta ampliamente para procesar y generar grandes conjuntos de datos con un algoritmo paralelo distribuido en un clúster. Permite a los usuarios escribir cálculos paralelos, utilizando un conjunto de operadores de alto nivel, sin tener que preocuparse por la distribución del trabajo y la tolerancia a fallas.
Desafortunadamente, en la mayoría de los marcos actuales, la única forma de reutilizar datos entre cálculos (Ej., entre dos trabajos de MapReduce) es escribirlos en un sistema de almacenamiento estable externo (Ej., HDFS). Aunque este marco proporciona numerosas abstracciones para acceder a los recursos computacionales de un clúster, los usuarios todavía quieren más.
Tanto las aplicaciones iterativas como las interactivas requieren un intercambio de datos más rápido entre trabajos paralelos. El uso compartido de datos es lento en MapReduce debido a la replicación, la serialización y la E/S del disco . En cuanto al sistema de almacenamiento, la mayoría de las aplicaciones de Hadoop pasan más del 90% del tiempo realizando operaciones de lectura y escritura HDFS.
Operaciones iterativas en MapReduce
Reutilice los resultados intermedios en múltiples cálculos en aplicaciones de varias etapas. La siguiente ilustración explica cómo funciona el marco actual, mientras se realizan las operaciones iterativas en MapReduce. Esto genera gastos generales considerables debido a la replicación de datos, la E/S de disco y la serialización, lo que hace que el sistema sea lento.
Operaciones interactivas en MapReduce
El usuario ejecuta consultas ad-hoc en el mismo subconjunto de datos. Cada consulta realizará la E/S del disco en el almacenamiento estable, que puede dominar el tiempo de ejecución de la aplicación.
La siguiente ilustración explica cómo funciona el marco actual mientras se realizan consultas interactivas en MapReduce.
Uso compartido de datos con Spark RDD
El uso compartido de datos es lento en MapReduce debido a la replicación, la serialización y la E/S del disco . La mayoría de las aplicaciones de Hadoop pasan más del 90 % del tiempo realizando operaciones de lectura y escritura de HDFS.
Al reconocer este problema, los investigadores desarrollaron un marco especializado llamado Apache Spark. La idea clave de Spark son conjuntos de datos distribuidos resistentes ( RDD); es compatible con el cálculo de procesamiento en memoria. Esto significa que almacena el estado de la memoria como un objeto entre los trabajos y el objeto se puede compartir entre esos trabajos. El intercambio de datos en la memoria es de 10 a 100 veces más rápido que la red y el disco.
Intentemos ahora averiguar cómo se llevan a cabo las operaciones iterativas e interactivas en Spark RDD.
Operaciones iterativas en Spark RDD
La siguiente ilustración muestra las operaciones iterativas en Spark RDD. Almacenará resultados intermedios en una memoria distribuida en lugar de almacenamiento estable (disco) y hará que el sistema sea más rápido.
Nota : si la memoria distribuida (RAM) es suficiente para almacenar resultados intermedios (estado del TRABAJO), entonces almacenará esos resultados en el disco.
Operaciones interactivas en Spark RDD
Esta ilustración muestra operaciones interactivas en Spark RDD. Si se ejecutan consultas diferentes en el mismo conjunto de datos repetidamente, estos datos en particular se pueden guardar en la memoria para mejorar los tiempos de ejecución.
De forma predeterminada, cada RDD transformado se puede volver a calcular cada vez que ejecuta una acción en él. Sin embargo, también puede conservar un RDD en la memoria, en cuyo caso Spark mantendrá los elementos en el clúster para un acceso mucho más rápido la próxima vez que lo consulte. También hay soporte para RDD persistentes en el disco o replicados en múltiples nodos.
Apache Spark – Instalación
Spark es el subproyecto de Hadoop. Por lo tanto, es mejor instalar Spark en un sistema basado en Linux. Los siguientes pasos muestran cómo instalar Apache Spark.
Paso 1: Verificación de la instalación de Java
La instalación de Java es una de las cosas obligatorias para instalar Spark. Pruebe el siguiente comando para verificar la versión de JAVA.
$java -version
Si Java ya está instalado en su sistema, verá la siguiente respuesta:
java version “1.7.0_71”
Java(TM) SE Runtime Environment (build 1.7.0_71-b13)
Java HotSpot(TM) Client VM (build 25.0-b02, mixed mode)
En caso de que no tenga Java instalado en su sistema, instale Java antes de continuar con el siguiente paso.
Paso 2: Verificación de la instalación de Scala
Debe utilizar el lenguaje Scala para implementar Spark. Entonces, verifiquemos la instalación de Scala usando el siguiente comando.
$scala -version
Si Scala ya está instalado en su sistema, verá la siguiente respuesta:
Scala code runner version 2.11.6 — Copyright 2002-2013, LAMP/EPFL
En caso de que no tenga Scala instalado en su sistema, continúe con el siguiente paso para la instalación de Scala.
Paso 3: Descargar Scala
Descargue la última versión de Scala visitando el siguiente enlace Descargar Scala . Para este tutorial, estamos usando la versión scala-2.11.6. Después de la descarga, encontrará el archivo tar de Scala en la carpeta de descarga.
Paso 4: Instalación de Scala
Siga los pasos que se indican a continuación para instalar Scala.
Extraiga el archivo tar de Scala
Escriba el siguiente comando para extraer el archivo tar de Scala.
$ tar xvf scala-2.11.6.tgz
Mover archivos de software Scala
Utilice los siguientes comandos para mover los archivos del software Scala al directorio respectivo (/usr/local/scala) .
$ su –
Password:
# cd /home/Hadoop/Downloads/
# mv scala-2.11.6 /usr/local/scala
# exit
Establecer RUTA para Scala
Use el siguiente comando para configurar PATH para Scala.
$ export PATH = $PATH:/usr/local/scala/bin
Verificación de la instalación de Scala
Después de la instalación, es mejor verificarlo. Utilice el siguiente comando para verificar la instalación de Scala.
$scala -version
Si Scala ya está instalado en su sistema, verá la siguiente respuesta:
Scala code runner version 2.11.6 — Copyright 2002-2013, LAMP/EPFL
Paso 5: Descargar Apache Spark
Descarga la última versión de Spark visitando el siguiente enlace Descargar Spark . Para este tutorial, estamos usando la versión spark-1.3.1-bin-hadoop2.6 . Después de descargarlo, encontrará el archivo Spark tar en la carpeta de descarga.
Paso 6: Instalación de Spark
Siga los pasos que se indican a continuación para instalar Spark.
Extracción de tar Spark
El siguiente comando para extraer el archivo spark tar.
$ tar xvf spark-1.3.1-bin-hadoop2.6.tgz
Mover archivos de software Spark
Los siguientes comandos para mover los archivos del software Spark al directorio respectivo (/usr/local/spark) .
$ su –
Password:
# cd /home/Hadoop/Downloads/
# mv spark-1.3.1-bin-hadoop2.6 /usr/local/spark
# exit
Configuración del entorno para Spark
Agregue la siguiente línea al archivo ~ /.bashrc . Significa agregar la ubicación, donde se encuentran los archivos del software Spark a la variable PATH.
export PATH=$PATH:/usr/local/spark/bin
Utilice el siguiente comando para obtener el archivo ~/.bashrc.
$ source ~/.bashrc
Paso 7: Verificación de la instalación de Spark
Escriba el siguiente comando para abrir Spark Shell.
$spark-shell
Si Spark se instaló correctamente, encontrará el siguiente resultado.
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark’s default log4j profile: org/apache/spark/log4j-defaults.properties
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server
15/06/04 15:25:23 INFO Utils: Successfully started service ‘HTTP class server’ on port 43292.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ ‘_/
/___/ .__/\_,_/_/ /_/\_\ version 1.4.0
/_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)
Type in expressions to have them evaluated.
Spark context available as sc
scala>
Apache Spark – Programación básica
Spark Core es la base de todo el proyecto. Proporciona distribución de tareas distribuidas, programación y funcionalidades básicas de E/S. Spark utiliza una estructura de datos fundamental especializada conocida como RDD (Conjuntos de datos distribuidos resistentes) que es una colección lógica de datos particionados entre máquinas. Los RDD se pueden crear de dos maneras; uno es haciendo referencia a conjuntos de datos en sistemas de almacenamiento externo y el segundo es aplicando transformaciones (por ejemplo, mapear, filtrar, reducir, unir) en RDD existentes.
La abstracción de RDD se expone a través de una API integrada en el lenguaje. Esto simplifica la complejidad de la programación porque la forma en que las aplicaciones manipulan los RDD es similar a la manipulación de colecciones locales de datos.
Concha de Spark
Spark proporciona un shell interactivo, una poderosa herramienta para analizar datos de forma interactiva. Está disponible en lenguaje Scala o Python. La abstracción principal de Spark es una colección distribuida de elementos llamada Conjunto de datos distribuido resistente (RDD). Los RDD se pueden crear a partir de formatos de entrada de Hadoop (como archivos HDFS) o mediante la transformación de otros RDD.
Concha de Spark abierta
El siguiente comando se usa para abrir Spark Shell.
$ spark-shell
Crear RDD simple
Vamos a crear un RDD simple a partir del archivo de texto. Use el siguiente comando para crear un RDD simple.
scala> val inputfile = sc.textFile(“input.txt”)
La salida para el comando anterior es
inputfile: org.apache.spark.rdd.RDD[String] = input.txt MappedRDD[1] at textFile at <console>:12
La API de Spark RDD presenta pocas transformaciones y pocas acciones para manipular RDD.
Transformaciones RDD
Las transformaciones de RDD devuelven el puntero a un nuevo RDD y le permiten crear dependencias entre RDD. Cada RDD en cadena de dependencia (String of Dependencies) tiene una función para calcular sus datos y tiene un puntero (dependencia) a su RDD principal.
Spark es perezoso, por lo que no se ejecutará nada a menos que llame a alguna transformación o acción que desencadene la creación y ejecución del trabajo. Mire el siguiente fragmento del ejemplo de conteo de palabras.
Por lo tanto, la transformación RDD no es un conjunto de datos, sino un paso en un programa (podría ser el único paso) que le dice a Spark cómo obtener datos y qué hacer con ellos.
A continuación se muestra una lista de transformaciones RDD.
Comportamiento
La siguiente tabla proporciona una lista de Acciones, que devuelven valores.
Programación con RDD
Veamos las implementaciones de algunas transformaciones y acciones de RDD en la programación de RDD con la ayuda de un ejemplo.
Ejemplo
Considere un ejemplo de conteo de palabras: cuenta cada palabra que aparece en un documento. Considere el siguiente texto como una entrada y se guarda como un archivo input.txt en un directorio de inicio.
input.txt – archivo de entrada.
people are not as beautiful as they look,
as they walk or as they talk.
they are only as beautiful as they love,
as they care as they share.
Siga el procedimiento que se indica a continuación para ejecutar el ejemplo dado.
Abrir Spark-Shell
El siguiente comando se usa para abrir Spark Shell. Generalmente, Spark se construye usando Scala. Por lo tanto, un programa Spark se ejecuta en el entorno Scala.
$ spark-shell
Si Spark Shell se abre con éxito, encontrará el siguiente resultado. Mire la última línea de la salida “Contexto de Spark disponible como sc” significa que el contenedor de Spark se crea automáticamente como objeto de contexto de Spark con el nombre sc . Antes de iniciar el primer paso de un programa, se debe crear el objeto SparkContext.
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark’s default log4j profile: org/apache/spark/log4j-defaults.properties
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server
15/06/04 15:25:23 INFO Utils: Successfully started service ‘HTTP class server’ on port 43292.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ ‘_/
/___/ .__/\_,_/_/ /_/\_\ version 1.4.0
/_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)
Type in expressions to have them evaluated.
Spark context available as sc
scala>
Crear un RDD
Primero, tenemos que leer el archivo de entrada usando la API de Spark-Scala y crear un RDD.
El siguiente comando se usa para leer un archivo desde una ubicación dada. Aquí, se crea un nuevo RDD con el nombre de archivo de entrada. La cadena que se proporciona como argumento en el método textFile(“”) es la ruta absoluta para el nombre del archivo de entrada. Sin embargo, si solo se proporciona el nombre del archivo, significa que el archivo de entrada se encuentra en la ubicación actual.
scala> val inputfile = sc.textFile(“input.txt”)
Ejecutar transformación de conteo de palabras
Nuestro objetivo es contar las palabras en un archivo. Crea un mapa plano para dividir cada línea en palabras ( flatMap(line ⇒ line.split(“ ”) ).
A continuación, lea cada palabra como una clave con un valor ‘1’ (<clave, valor> = <palabra, 1>) utilizando la función de mapa ( mapa(palabra ⇒ (palabra, 1) ).
Finalmente, reduzca esas claves agregando valores de claves similares ( reduceByKey(_+_) ).
El siguiente comando se utiliza para ejecutar la lógica de recuento de palabras. Después de ejecutar esto, no encontrará ningún resultado porque esto no es una acción, es una transformación; señalar un nuevo RDD o decirle a Spark qué hacer con los datos dados)
scala> val counts = inputfile.flatMap(line => line.split(” “)).map(word => (word, 1)).reduceByKey(_+_);
DDR actual
Mientras trabaja con el RDD, si desea conocer el RDD actual, utilice el siguiente comando. Le mostrará la descripción sobre el RDD actual y sus dependencias para la depuración.
scala> counts.toDebugString
Almacenamiento en caché de las transformaciones
Puede marcar un RDD para que se mantenga usando los métodos persist() o cache() en él. La primera vez que se calcula en una acción, se mantendrá en la memoria de los nodos. Utilice el siguiente comando para almacenar las transformaciones intermedias en la memoria.
scala> counts.cache()
Aplicar la acción
Aplicar una acción, como almacenar todas las transformaciones, da como resultado un archivo de texto. El argumento String para el método saveAsTextFile(“ ”) es la ruta absoluta de la carpeta de salida. Pruebe el siguiente comando para guardar la salida en un archivo de texto. En el siguiente ejemplo, la carpeta ‘salida’ está en la ubicación actual.
scala> counts.saveAsTextFile(“output”)
Comprobación de la salida
Abra otra terminal para ir al directorio de inicio (donde se ejecuta Spark en la otra terminal). Use los siguientes comandos para verificar el directorio de salida.
[hadoop@localhost ~]$ cd output/
[hadoop@localhost output]$ ls -1
part-00000
part-00001
_SUCCESS
El siguiente comando se usa para ver la salida de los archivos Part-00000 .
[hadoop@localhost output]$ cat part-00000
Producción
(people,1)
(are,2)
(not,1)
(as,8)
(beautiful,2)
(they, 7)
(look,1)
El siguiente comando se usa para ver la salida de los archivos Part-00001 .
[hadoop@localhost output]$ cat part-00001
Producción
(walk, 1)
(or, 1)
(talk, 1)
(only, 1)
(love, 1)
(care, 1)
(share, 1)
ONU persiste el almacenamiento
Antes de cancelar la persistencia, si desea ver el espacio de almacenamiento que se utiliza para esta aplicación, utilice la siguiente URL en su navegador.
http://localhost:4040
Verá la siguiente pantalla, que muestra el espacio de almacenamiento utilizado para la aplicación, que se ejecuta en el shell de Spark.
Si desea cancelar la persistencia del espacio de almacenamiento de un RDD en particular, utilice el siguiente comando.
Scala> counts.unpersist()
Verá la salida de la siguiente manera:
15/06/27 00:57:33 INFO ShuffledRDD: Removing RDD 9 from persistence list
15/06/27 00:57:33 INFO BlockManager: Removing RDD 9
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_1
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_1 of size 480 dropped from memory (free 280061810)
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_0
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_0 of size 296 dropped from memory (free 280062106)
res7: cou.type = ShuffledRDD[9] at reduceByKey at <console>:14
Para verificar el espacio de almacenamiento en el navegador, use la siguiente URL.
http://localhost:4040/
Verá la siguiente pantalla. Muestra el espacio de almacenamiento utilizado para la aplicación, que se ejecuta en el shell de Spark.
Apache Spark – Implementación
La aplicación Spark, que usa spark-submit, es un comando de shell que se usa para implementar la aplicación Spark en un clúster. Utiliza todos los administradores de clúster respectivos a través de una interfaz uniforme. Por lo tanto, no tienes que configurar tu aplicación para cada uno.
Ejemplo
Tomemos el mismo ejemplo de conteo de palabras que usamos antes, usando comandos de shell. Aquí, consideramos el mismo ejemplo como una aplicación Spark.
Entrada de muestra
El siguiente texto son los datos de entrada y el nombre del archivo es in.txt .
people are not as beautiful as they look,
as they walk or as they talk.
they are only as beautiful as they love,
as they care as they share.
Mira el siguiente programa −
SparkWordCount.scala
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark._
object SparkWordCount {
def main(args: Array[String]) {
val sc = new SparkContext( “local”, “Word Count”, “/usr/local/spark”, Nil, Map(), Map())
/* local = master URL; Word Count = application name; */
/* /usr/local/spark = Spark Home; Nil = jars; Map = environment */
/* Map = variables to work nodes */
/*creating an inputRDD to read text file (in.txt) through Spark context*/
val input = sc.textFile(“in.txt”)
/* Transform the inputRDD into countRDD */
val count = input.flatMap(line ⇒ line.split(” “))
.map(word ⇒ (word, 1))
.reduceByKey(_ + _)
/* saveAsTextFile method is an action that effects on the RDD */
count.saveAsTextFile(“outfile”)
System.out.println(“OK”);
}
}
Guarde el programa anterior en un archivo llamado SparkWordCount.scala y colóquelo en un directorio definido por el usuario llamado spark-application .
Nota : al transformar inputRDD en countRDD, estamos usando flatMap() para tokenizar las líneas (del archivo de texto) en palabras, el método map() para contar la frecuencia de palabras y el método reduceByKey() para contar cada repetición de palabra.
Utilice los siguientes pasos para enviar esta solicitud. Ejecute todos los pasos en el directorio de aplicaciones de Spark a través de la terminal.
Paso 1: Descarga Spark Ja
Se requiere Spark core jar para la compilación, por lo tanto, descargue spark-core_2.10-1.3.0.jar desde el siguiente enlace Spark core jar y mueva el archivo jar del directorio de descarga al directorio de aplicación de Spark .
Paso 2: Compilar el programa
Compile el programa anterior usando el comando dado a continuación. Este comando debe ejecutarse desde el directorio de aplicaciones de Spark. Aquí, /usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar es un jar de soporte de Hadoop tomado de la biblioteca Spark.
$ scalac -classpath “spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar” SparkPi.scala
Paso 3: Crea un JAR
Cree un archivo jar de la aplicación Spark con el siguiente comando. Aquí, wordcount es el nombre del archivo para el archivo jar.
jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar
Paso 4: envíe la solicitud de Spark
Envíe la aplicación Spark usando el siguiente comando:
spark-submit –class SparkWordCount –master local wordcount.jar
Si se ejecuta con éxito, encontrará el resultado que se muestra a continuación. El OK que deja entrar la siguiente salida es para la identificación del usuario y esa es la última línea del programa. Si lee atentamente el siguiente resultado, encontrará cosas diferentes, como:
- inició con éxito el servicio ‘sparkDriver’ en el puerto 42954
- MemoryStore comenzó con una capacidad de 267,3 MB
- Inició SparkUI en http://192.168.1.217:4040
- Se agregó el archivo JAR:/home/hadoop/piapplication/count.jar
- ResultStage 1 (saveAsTextFile en SparkPi.scala:11) finalizó en 0,566 s
- Se detuvo la interfaz de usuario web de Spark en http://192.168.1.217:4040
- MemoryStore borrado
15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started
15/07/08 13:56:04 INFO Utils: Successfully started service ‘sparkDriver’ on port 42954.
15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:42954]
15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB
15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server
15/07/08 13:56:05 INFO Utils: Successfully started service ‘HTTP file server’ on port 56707.
15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040
15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029
15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader
15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54
15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver
(MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable
15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11)
15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s
15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s
OK
15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook
15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040
15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler
15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion.
15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared
15/07/08 13:56:14 INFO BlockManager: BlockManager stopped
15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped
15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext
15/07/08 13:56:14 INFO Utils: Shutdown hook called
15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af
15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
Paso 5: Comprobación de la salida
Después de la ejecución exitosa del programa, encontrará el directorio llamado outfile en el directorio de aplicaciones de Spark.
Los siguientes comandos se utilizan para abrir y verificar la lista de archivos en el directorio outfile.
$ cd outfile
$ ls
Part-00000 part-00001 _SUCCESS
Los comandos para verificar la salida en el archivo part-00000 son:
$ cat part-00000
(people,1)
(are,2)
(not,1)
(as,8)
(beautiful,2)
(they, 7)
(look,1)
Los comandos para verificar la salida en el archivo part-00001 son:
$ cat part-00001
(walk, 1)
(or, 1)
(talk, 1)
(only, 1)
(love, 1)
(care, 1)
(share, 1)
Vaya a la siguiente sección para saber más sobre el comando ‘spark-submit’.
Sintaxis de envío de Spark
spark-submit [options] <app jar | python file> [app arguments]
Opciones
La siguiente tabla describe una lista de opciones :
Programación avanzada de Sparks
Spark contiene dos tipos diferentes de variables compartidas: una son variables de transmisión y la segunda son acumuladores .
- Variables de difusión : se utilizan para distribuir de manera eficiente valores grandes.
- Acumuladores : se utilizan para agregar la información de una colección particular.
Variables de difusión
Las variables de difusión permiten al programador mantener una variable de solo lectura en caché en cada máquina en lugar de enviar una copia con las tareas. Se pueden usar, por ejemplo, para dar a cada nodo una copia de un gran conjunto de datos de entrada, de manera eficiente. Spark también intenta distribuir variables de transmisión utilizando algoritmos de transmisión eficientes para reducir los costos de comunicación.
Las acciones de Spark se ejecutan a través de un conjunto de etapas, separadas por operaciones de “reproducción aleatoria” distribuidas. Spark transmite automáticamente los datos comunes que necesitan las tareas dentro de cada etapa.
Los datos transmitidos de esta manera se almacenan en caché en forma serializada y se deserializan antes de ejecutar cada tarea. Esto significa que la creación explícita de variables de transmisión solo es útil cuando las tareas en varias etapas necesitan los mismos datos o cuando es importante almacenar en caché los datos en formato deserializado.
Las variables de transmisión se crean a partir de una variable v llamando a SparkContext.broadcast(v) . La variable de difusión es un envoltorio alrededor de v , y se puede acceder a su valor llamando al método de valor . El código dado a continuación muestra esto:
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
Salida −
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
Después de crear la variable de difusión, debe usarse en lugar del valor v en cualquier función que se ejecute en el clúster, de modo que v no se envíe a los nodos más de una vez. Además, el objeto v no debe modificarse después de su transmisión, para garantizar que todos los nodos obtengan el mismo valor de la variable de transmisión.
Acumuladores
Los acumuladores son variables que solo se “agregan” a través de una operación asociativa y, por lo tanto, pueden admitirse de manera eficiente en paralelo. Se pueden usar para implementar contadores (como en MapReduce) o sumas. Spark admite de forma nativa acumuladores de tipos numéricos y los programadores pueden agregar compatibilidad con nuevos tipos. Si los acumuladores se crean con un nombre, se mostrarán en la interfaz de usuario de Spark . Esto puede ser útil para comprender el progreso de las etapas en ejecución (NOTA: esto aún no es compatible con Python).
Se crea un acumulador a partir de un valor inicial v llamando a SparkContext.accumulator(v) . Las tareas que se ejecutan en el clúster se pueden agregar mediante el método de agregar o el operador += (en Scala y Python). Sin embargo, no pueden leer su valor. Solo el programa controlador puede leer el valor del acumulador, utilizando su método de valor .
El código que se proporciona a continuación muestra un acumulador que se usa para sumar los elementos de una matriz:
scala> val accum = sc.accumulator(0)
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
Si desea ver el resultado del código anterior, use el siguiente comando:
scala> accum.value
Producción
res2: Int = 10
Operaciones numéricas RDD
Spark le permite realizar diferentes operaciones en datos numéricos, utilizando uno de los métodos API predefinidos. Las operaciones numéricas de Spark se implementan con un algoritmo de transmisión que permite construir el modelo, un elemento a la vez.
Estas operaciones se calculan y devuelven como un objeto StatusCounter llamando al método status() .
La siguiente es una lista de métodos numéricos disponibles en StatusCounter .
Si desea utilizar solo uno de estos métodos, puede llamar al método correspondiente directamente en RDD.