Spark Core es la base de todo el proyecto. Proporciona distribuye despacho de tareas, programación y funciones básicas de E / S. Spark utiliza una estructura de datos fundamental especializada conocida como RDD (Conjuntos de datos distribuidos resilientes) que es una colección lógica de los datos particionados a través de máquinas. DDR se pueden crear de dos formas; uno está haciendo referencia a los conjuntos de datos en los sistemas de almacenamiento externo y la segunda es por las transformaciones que aplican (por ejemplo, mapa, filtro, reductor, se unen) en DDR existentes.
La abstracción RDD se expone a través de una API de lenguaje integrado. Esto simplifica la complejidad de la programación debido a la forma en que las aplicaciones manipulan DDR es similar a la manipulación de colecciones locales de datos.
Spark Shell
Spark ofrece un shell interactivo - una poderosa herramienta para analizar los datos de forma interactiva. Está disponible en cualquier Scala o el lenguaje Python. Abstracción principal de Spark es un conjunto distribuido de elementos llamados un Resilient conjunto de datos distribuido (RDD). DDR se puede crear desde Formatos de entrada Hadoop (como archivos HDFS) o mediante la transformación de otros DDR.
Abrir Spark Shell
El siguiente comando se utiliza para abrir la cáscara Spark.
$ spark-shell
Crear sencilla RDD
Vamos a crear una sencilla RDD del archivo de texto. Utilice el siguiente comando para crear un simple RDD.
scala> val inputfile = sc.textFile(“input.txt”)
La salida del comando anterior es
inputfile: org.apache.spark.rdd.RDD[String] = input.txt MappedRDD[1] at textFile at <console>:12
La API RDD Spark introduce algunas transformaciones y pocas accionespara manipular RDD.
RDD Transformaciones
Transformaciones RDD devuelve puntero al nuevo RDD y le permite crear dependencias entre DDR. Cada RDD en cadena de la dependencia (Cadena de dependencias) tiene una función para el cálculo de sus datos y tiene un puntero (dependencia) a su matriz RDD.
Spark es perezoso, así que nada se ejecutará a menos que llame alguna transformación o acción que activará la creación de empleo y la ejecución. Mira el siguiente fragmento del ejemplo número de palabras.
Por lo tanto, la transformación RDD no es un conjunto de datos, pero es un paso en un programa (que podría ser el único paso) diciendo Spark cómo obtener los datos y qué hacer con él.
A continuación hay una lista de transformaciones RDD.
S.No | Transformaciones y Significado |
---|---|
1 | Mapa (func) Devuelve un nuevo conjunto de datos distribuida, formado haciendo pasar cada elemento de la fuente a través de una función func. |
2 | filtro (func) Devuelve un nuevo conjunto de datos formado por la selección de los elementos de la fuente en la que func devuelve true. |
3 | flatMap (func) Al igual que en el mapa, pero cada elemento de entrada se puede asignar a 0 o más elementos de salida (modo func debe devolver una Sec en lugar de un solo artículo). |
4 | mapPartitions (func) Similar al mapa, pero corre por separado en cada partición (bloque) de la RDD, por lo func debe ser de tipo Iterator <T> ⇒ Iterator <U> cuando se ejecuta en un RDD de tipo T. |
5 | mapPartitionsWithIndex (func) Al igual que en el mapa de particiones, sino que también proporcionafunc con un valor entero que representa el índice de la partición, así que func debe ser del tipo (Int, Iterator <T>) ⇒ Iterator <U> cuando se ejecuta en un RDD de tipo T. |
6 | muestra (withReplacement, fracción, semilla) Pruebe una fracción de los datos, con o sin sustitución, utilizando una semilla generador de números aleatorios dado. |
7 | union (otherDataset) Devuelve un nuevo conjunto de datos que contiene la unión de los elementos en el conjunto de datos de origen y el argumento. |
8 | intersección (otherDataset) Devuelve un nuevo RDD que contiene la intersección de elementos en el conjunto de datos de origen y el argumento. |
9 | distinta ([numTasks]) Devuelve un nuevo conjunto de datos que contiene los distintos elementos del conjunto de datos de origen. |
10 | groupByKey ([numTasks]) Cuando se llama a un conjunto de datos de (K, V) pares, devuelve un conjunto de datos de (K, iterable <V>) pares. Nota - Si está agrupando a fin de realizar una agregación (como una suma o promedio) sobre cada tecla, usando reduceByKey o aggregateByKey rendirá mucho mejor rendimiento. |
11 | reduceByKey (func, [numTasks]) Cuando se llama a un conjunto de datos de (K, V) pares, devuelve un conjunto de datos de (K, V) pares donde los valores para cada tecla se agregan utilizando el dado reducir la función func, que debe ser de tipo (V, V) ⇒ V . Al igual que en groupByKey, el número de reducir las tareas se puede configurar a través de un segundo argumento opcional. |
12 | aggregateByKey (zeroValue) (seqOp, combOp, [numTasks]) Cuando se llama a un conjunto de datos de (K, V) pares, devuelve un conjunto de datos de (K, T) pares donde los valores para cada tecla se agregan utilizando las funciones dadas combinar y neutral valor "cero". Permite un tipo de valor agregado que es diferente del tipo de valor de entrada, evitando al mismo tiempo las asignaciones innecesarias. Al igual que en groupByKey, el número de reducir las tareas se puede configurar a través de un segundo argumento opcional. |
13 | sortByKey ([ascendente], [numTasks]) Cuando se llama a un conjunto de datos de (K, V) pares donde implementos K pidió, devuelve un conjunto de datos de (K, V) pares ordenados por claves en orden ascendente o descendente, según se especifica en el argumento ascendente de Boole. |
14 | join (otherDataset, [numTasks]) Cuando se llama en conjuntos de datos de tipo (k, V) y (K, W), devuelve un conjunto de datos de pares con todos los pares de elementos para cada tecla (K, (V, W)). Combinaciones externas son apoyados a través leftOuterJoin, rightOuterJoin y fullOuterJoin. |
15 | cogroup (otherDataset, [numTasks]) Cuando pidió a los conjuntos de datos de tipo (K, V) y (K, W), devuelve un conjunto de datos de (K, (Iterable <V>, iterable <W>)) tuplas. Esta operación también se denomina grupo de Con. |
16 | cartesiano (otherDataset) Cuando pidió a los conjuntos de datos de tipo T y U, devuelve un conjunto de datos de (T, U) pares (todos los pares de elementos). |
17 | tubería (mando, [envvars]) Pipe cada partición del RDD a través de un comando de shell, por ejemplo, un Perl o script bash. Elementos RDD se escriben en la entrada estándar y las líneas de salida del proceso a su salida estándar se devuelven como una RDD de cadenas. |
18 | coalescer (numPartitions) Disminuir el número de particiones en el RDD a numPartitions. Útil para las operaciones en funcionamiento más eficiente después de filtrar por un gran conjunto de datos. |
19 | reparto (numPartitions) Reordenar los datos de la RDD al azar para crear ya sea más o menos particiones y el equilibrio que a través de ellos. Esto siempre baraja todos los datos sobre la red. |
20 | repartitionAndSortWithinPartitions (particionador) Reparticionar el RDD según el particionador dado y, dentro de cada partición resultante, ordenar los registros por parte de sus llaves. Esto es más eficiente que llamar reparto y luego la clasificación dentro de cada partición, ya que puede empujar hacia abajo en la clasificación de la maquinaria aleatoria. |
Acciones
La siguiente tabla muestra una lista de acciones, que devuelven valores.
S.No | Acción y Significado |
---|---|
1 | reducir (func) Agregar los elementos del conjunto de datos utilizando una funciónfunc (que toma dos argumentos y devuelve uno). La función debe ser conmutativa y asociativa de modo que se puede calcular correctamente en paralelo. |
2 | recoger() Devuelve todos los elementos del conjunto de datos como un conjunto en el programa piloto. Esto suele ser útil después de un filtro u otra operación que devuelve un suficientemente pequeño subconjunto de los datos. |
3 | count () Devuelve el número de elementos en el conjunto de datos. |
4 | primero() Devuelve el primer elemento del conjunto de datos (similar a tomar (1)). |
5 | tomado) Devuelve una matriz con los primeros n elementos del conjunto de datos. |
6 | takeSample (withReplacement, num, [semilla]) Devuelve una matriz con una muestra aleatoria de num elementos del conjunto de datos, con o sin reemplazo, opcionalmente pre-especificando una semilla generador de números aleatorios. |
7 | takeOrdered (n, [pedido]) Devuelve los primeros n elementos de la RDD utilizando cualquiera de su orden natural o un comparador personalizado. |
8 | saveAsTextFile (ruta) Escribe los elementos del conjunto de datos como un archivo de texto (o un conjunto de archivos de texto) en un directorio dado en el sistema de archivos local, HDFS o cualquier otro sistema de archivo compatible-Hadoop. Spark llama a toString sobre cada elemento para convertirlo en una línea de texto en el archivo. |
9 | saveAsSequenceFile (ruta) (Java y Scala) Escribe los elementos del conjunto de datos como Hadoop SequenceFile en una ruta dada en el sistema de archivos local, HDFS o cualquier otro sistema de archivo compatible-Hadoop. Esto está disponible en DDR de pares clave-valor que implementan la interfaz grabable de Hadoop. En Scala, que también está disponible en los tipos que son implícitamente convertible en grabables (Spark incluye conversiones de tipos básicos como int, double, String, etc). |
10 | saveAsObjectFile (ruta) (Java y Scala) Escribe los elementos del conjunto de datos en un formato simple usando serialización Java, que luego puede ser cargado usando SparkContext.objectFile (). |
11 | countByKey () Sólo disponible en DDR de tipo (K, V). Devuelve un HashMap de pares con el recuento de cada tecla (K, Int). |
12 | foreach (func) Ejecuta una función func a cada elemento del conjunto de datos. Esto es por lo general, hecho por efectos secundarios tales como la actualización de un acumulador o interactuar con los sistemas de almacenamiento externos. Nota - la modificación de variables distintas de Acumuladores fuera del foreach () puede dar lugar a un comportamiento indefinido.Consulte Descripción de cierres para más detalles. |
Programación con RDD
Veamos las implementaciones de algunas transformaciones y acciones RDD en la programación RDD con la ayuda de un ejemplo.
Ejemplo
Consideremos un ejemplo recuento de palabras - Se cuenta cada palabra que aparece en un documento. Considere el siguiente texto como una entrada y se guarda como un input.txt archivo en un directorio principal.
input.txt - archivo de entrada.
people are not as beautiful as they look,
as they walk or as they talk.
they are only as beautiful as they love,
as they care as they share.
Siga el procedimiento descrito a continuación para ejecutar el ejemplo dado.
Abrir Spark-Shell
El siguiente comando se utiliza para abrir la cáscara chispa. En general, la chispa se construye utilizando Scala. Por lo tanto, un programa Spark se ejecuta en entorno Scala.
$ spark-shell
Si la cáscara Spark abre correctamente entonces usted encontrará la siguiente salida. Mira la última línea del "contexto Spark disponible como sc" medios de salida del contenedor de chispa se crea automáticamente objeto de contexto de la chispa con el nombre sc. Antes de iniciar el primer paso de un programa, se debe crear el objeto SparkContext.
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.4.0
/_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)
Type in expressions to have them evaluated.
Spark context available as sc
scala>
Crear una RDD
En primer lugar, tenemos que leer el archivo de entrada usando API Spark-Scala y crear un RDD.
El siguiente comando se utiliza para leer un archivo de un lugar determinado.Aquí, nuevo RDD se crea con el nombre del archivo de entrada. La cadena que se da como argumento en el archivo de texto ("") método es ruta absoluta para el nombre de archivo de entrada. Sin embargo, si se le da el nombre de archivo, entonces significa que el archivo de entrada está en la ubicación actual.
scala> val inputfile = sc.textFile("input.txt")
Ejecutar Palabra Transformación recuento
Nuestro objetivo es contar las palabras en un archivo. Crear un mapa plano para dividir cada línea en palabras (flatMap (línea ⇒ linea.split ("")).
A continuación, lea cada palabra como una clave con un valor "1" (<clave, valor> = <palabra, 1>) utilizando la función de mapa (mapa (palabra ⇒ (palabra, 1)).
Por último, reducir esas llaves añadiendo valores de claves similares(reduceByKey (_ + _)).
El siguiente comando se utiliza para la ejecución de la lógica recuento de palabras. Después de ejecutar esta, no vas a encontrar ninguna salida porque esto no es una acción, esto es una transformación; señalando un nuevo RDD o decirle chispa para qué hacer con los datos dados)
scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);
RDD actual
Mientras trabajaba con la RDD, si usted quiere saber sobre la corriente RDD, a continuación, utilice el siguiente comando. Se le mostrará la descripcion de RDD actual y sus dependencias para la depuración.
scala> counts.toDebugString
Almacenamiento en caché de las Transformaciones
Puede marcar un RDD se persistió con el persistir () o caché () métodos en él.La primera vez que se calcula en una acción, que se mantendrá en la memoria en los nodos. Utilice el siguiente comando para almacenar las transformaciones intermedias en la memoria.
scala> counts.cache()
La aplicación de la Acción
La aplicación de una acción, como la tienda de todas las transformaciones, los resultados en un archivo de texto. El argumento de cadena para saveAsTextFile ("") método es la ruta absoluta de la carpeta de salida. Pruebe el siguiente comando para guardar la salida en un archivo de texto. En el siguiente ejemplo, la carpeta de 'salida' se encuentra en la ubicación actual.
scala> counts.saveAsTextFile("output")
Comprobación de la salida
Abra otro terminal para ir al directorio principal (donde la chispa se ejecuta en el otro terminal). Utilice los siguientes comandos para comprobar el directorio de salida.
[hadoop@localhost ~]$ cd output/
[hadoop@localhost output]$ ls -1
part-00000
part-00001
_SUCCESS
El siguiente comando se utiliza para ver la salida de la Parte-00000 archivos.
[hadoop@localhost output]$ cat part-00000
Producción
(people,1)
(are,2)
(not,1)
(as,8)
(beautiful,2)
(they, 7)
(look,1)
El siguiente comando se utiliza para ver la salida de la Parte-00001 archivos.
[hadoop@localhost output]$ cat part-00001
Producción
(walk, 1)
(or, 1)
(talk, 1)
(only, 1)
(love, 1)
(care, 1)
(share, 1)
ONU Persistir el almacenamiento
Antes de ONU-persistente, si quieres ver el espacio de almacenamiento que se utiliza para esta aplicación, a continuación, utilizar la siguiente URL en su navegador.
http://localhost:4040
Verá la siguiente pantalla, que muestra el espacio de almacenamiento utilizado para la aplicación, que se ejecuta en el shell Spark.

Si desea ONU-persistir el espacio de almacenamiento de especial RDD, a continuación, utilice el siguiente comando.
Scala> counts.unpersist()
Verá la salida de la siguiente manera -
15/06/27 00:57:33 INFO ShuffledRDD: Removing RDD 9 from persistence list
15/06/27 00:57:33 INFO BlockManager: Removing RDD 9
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_1
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_1 of size 480 dropped from memory (free 280061810)
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_0
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_0 of size 296 dropped from memory (free 280062106)
res7: cou.type = ShuffledRDD[9] at reduceByKey at <console>:14
Para verificar el espacio de almacenamiento en el navegador, utilice la siguiente URL.
http://localhost:4040/
Verá la siguiente pantalla. Muestra el espacio de almacenamiento utilizado para la aplicación, que se ejecuta en el shell Spark.
No hay comentarios.:
Publicar un comentario