最近公司有同事在移动端网页用到flexiable.js

1
2
3
4
5
6
7
8
9
10
11
12
13
 if (isIPhone) {
// iOS下,对于2和3的屏,用2倍的方案,其余的用1倍方案
if (devicePixelRatio >= 3 && (!dpr || dpr >= 3)) {
dpr = 3;
} else if (devicePixelRatio >= 2 && (!dpr || dpr >= 2)){
dpr = 2;
} else {
dpr = 1;
}
} else {
// 其他设备下,仍旧使用1倍的方案
dpr = 1;
}

此段代码如果ua不包含“iphone”,那么采用1dpr,理由是

淘宝的做法是只管iOS,安卓一律把dpr设为1,官方的说法是安卓的厂商会自己修改dpr,导致安卓上的 window. devicePixelRatio 是假的。

出处 但如果安卓的devicePixelRatio 是真实的且大于1则没有适用此方案。 再看另一处代码,

1
2
3
4
5
6
7
8
 if (metaEl) {
console.warn('将根据已有的meta标签来设置缩放比例');
var match = metaEl.getAttribute('content').match(/initial\-scale=([\d\.]+)/);
if (match) {
scale = parseFloat(match[1]);
dpr = parseInt(1 / scale);
}
}

如果已经自己设置了meta viewport initial-scale=1.0 则scale为1,dpr也只能为1。 如果可根据devicePixelRatio动态设置initial-scale那么可解决1px边框的问题,但以上两种情况下scale只能为1. 再接着看另一处

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
    function refreshRem(){
var width = docEl.getBoundingClientRect().width;
if (width / dpr > 540) {
width = 540 * dpr;
}
var rem = width / 10;
docEl.style.fontSize = rem + 'px';
flexible.rem = win.rem = rem;
}
```

如果width / dpr > 540 则分辨率宽度按540处理,然而我们结合[友盟指数](http://tip.umeng.com/uploads/data_report/mobile_phone_report_.pdf)来看,540已经不在友盟的统计范围,也就是排除在主流分辨率之外了。也就是说在目前主流分辨率下html font-size就是54px,并且这里存在的问题是参照值应该取设计稿的宽度(px)而不是固定540,如果想要灵活地根据设计稿来设置最大宽度可以使用[hotcss](https://github.com/imochen/hotcss),与flexiable是类似方案在meta上可设置max-width,且scss,less上也可以设置设计稿的宽度。 那么flexiable还适用于现在的开发么?结合hotcss所给出的案例地址还有网易新闻来看还是有几个案例在使用的,然而天猫和淘宝本身没在使用,那么此时想问为什么? 我先回答flexiable是怎么解决适配的问题吧 1,根据不同的dpr设置html元素的基础字号像素为单位,以此来达到在元素上设置rem为单位的尺寸时元素大小边距等的缩放。 2,由于1动态改变了html基础字号,我们又需要\[data-dpr\]来设置不同dpr下元素字号,这是为1解决的问题而产生的新问题买单。 3,根据不同的dpr设置scale,解决dpr>1时1px边框会比dpr=1时稍粗的问题,但是前面我们看到scale在大多数情况下就是1. 我们再来看看其他问题[ios使用scale后,调用高德地图,地图也会被缩掉,怎么解决?](https://github.com/amfe/lib-flexible/issues/54) [微信二维码识别相关](https://github.com/amfe/article/issues/17#issuecomment-259130658) 这两个问题最后给的解决办法还是将dpr设置为1来解决。 那么flexiable实际上的作用只有我上面列的三项中的第一项了。我再来回答为什么不用flexiable; 1,1像素边框的问题可以用

``` css
.scale{
position: relative;
}
.scale:after{
content:"";
position: absolute;
bottom:0px;
left:0px;
right:0px;
border-bottom:1px solid #ddd;
-webkit-transform:scaleY(.5);
-webkit-transform-origin:0 0;
}

或者干脆不解决。对此我的观点是1px到底采用1个,2个还是N个物理像素那是浏览器所采用的方案。天猫和淘宝现在都宽高边距用px了,即便结果是普通安卓机上和retina上元素看起来大小不一样。 2,对于页面上较多绝对定位的,我们可以直接<meta name="viewport" content="width:设计稿尺寸(px)"> 来解决。 那么flexiable还有理由用吗? 有,如果你的设计稿尺寸是540或者你灵活地根据设计稿设置了最大宽度,那么屏幕宽度对于小于设计稿宽度的设备,基于动态的设置html font-size,元素的尺寸和边距可以很好的缩小。当然这里的前提是设计稿的宽度应是主流移动设备的屏幕宽度,如果设计稿的宽度就小那么你想想你到底用到flexiable的哪一点吧?

构思:salt + redis 结合一个python web framework来做 参考项目: saltobserver 用flask搭建用来展示salt redis returner的结果,采用gevent实时从redis里获取数据并发送给websocket client. 最初我想使用asyncio,我看到asyncio可以很方便的和tornaodo结合起来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from tornado.platform.asyncio import AsyncIOMainLoop

def main():
tornado.options.parse\_config\_file("config.py")
tornado.options.parse\_command\_line()
AsyncIOMainLoop().install()
application = Application()
application.listen(options.port)
loop = asyncio.get\_event\_loop()
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
loop.close()

tornado-redis 作者说如果使用redis pubsub推荐用他这个项目,项目的demo里有个sockjs demo 展示了如何使用tornado-redis构建公共聊天网站。 demo里用到了sockjs-tornado,然而这个是为了结合socketjs来使用的,也就是说同时处理一般的http请求和websocket,我并不考虑向下兼容的问题,那么

1
subscriber = tornadoredis.pubsub.SockJSSubscriber(tornadoredis.Client())

这个tornadoredis.pubsub.SockJSSubscriber类我是否用得到呢?这只是一个包装类,继承自BaseSubscriber重写了on_message发送广播给subscribers。
看来用不到,那么我大概可以用SocketIOSubscriber在tornado.websocket.WebSocketHandler处理subscribe,unsubscribe,重写on_message就好了。

原本我想使用asyncio同时我可以使用asyncio-redis 这样我web framework我可以用更简单的aiohttp,然而当我在服务器上测试用salt client发送指令时发现并不能兼容python3 https://github.com/saltstack/salt/issues/24750 那么综合下来可供我使用的有哪些呢
由于salt client暂时不支持python3,也就无法使用asyncio,aiohttp,那么我先用tornado这样以后支持也很方便切换过来。
SocketIOSubscriber的on_message会调用subscriber的on_message也就是在Handler里on_message,那么SocketIOSubscriber的on_message什么时候调用呢?当redis里有了新消息时。


今天是10月8日,昨天的雾霾消散了。发布这篇文章有一个多月了,这个构思走到了第一个版本,我把它放在了github,下面是预览。

assassin_preview

https://github.com/bung87/assassin 项目名称用了一款我喜爱的游戏的名字《刺客信条》。用两台机器配置了saltstack查看各种命令返回的数据结构,然后找来一些库解析结果,实现功能,尚未在正式环境跑过。 只有ping大致的做了数据可视化,用raphel.js实现了动画效果,然后包含ping,dig,traceroute都做了基本的表格形式的数据展示。 traceroute这个功能应该是选择一个节点去测试,由于兴趣也不是很浓,没做到这一步。 任务用jquery的deffered实现任务状态的更新,基于socket实现实时的客户端任务状态更新,完成了process状态的实现,需要根据任务开始前确定可用节点,及任务结束后确定返回结果的节点,设置超时,对比节点响应数等,这部分没做。

原文:http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/ OCT 1ST, 2014 Spark Streaming 作为一种实时数据处理工具近来越来越受到关注,与 Apache Storm 一起被提及。如果你问我,没和kafka结合而做实时数据处理的工具是不完整的,因此我添加了Spark Streaming的示例应用程序(kafka-storm-starter )以演示如何从kafka读取数据和写入数据到kafka,用Avro作为数据格式 Twitter Bijection 处理数据序列化。

什么是 Spark Streaming?

Spark StreamingApache Spark的子项目。Spark 是一个类似于Hadoop的批处理平台,Spark Streaming是一个运行在 Spark引擎之上的实时数据处理工具。

Spark Streaming vs. Apache Storm

Spark Streaming 的用例与Apache Storm密切相关。这无疑是当今最流行的实时大数据处理平台。Bobby Evans and Tom Graves of Yahoo! Engineering 最近在Spark and Storm at Yahoo!有一场讲座,他们比较这两个平台,包括关于什么时候、为什么选择其中一种。P. Taylor Goetz of HortonWorks 分享了一个标题为 Apache Storm and Spark Streaming Compared的幻灯片。 这是我个人非常简短的比较: Storm相对于Spark Streaming具有较高的行业认可度和更好的稳定性。在另一方面,Spark具有更易表达、更高层级API,可以说更易使用,如果你用Scala写你的Spark 应用的话(我更喜欢Spark API).但是别相信我的话,请查看上面提到的讲座和幻灯片。 Spark和Strom都是Apache的顶级项目,供应商已经开始使用其中一种或者两种共同使用在它们的商业服务,例如 HortonWorks (Storm, Spark) and Cloudera (Spark).

Excursus: Machines, cores, executors, tasks, and receivers in Spark

文章后面的单元讲了很多关于在Spark和Kafka的并行。你至少要基本理解Spark的一些技术才可以跟上这些单元里的讨论。

  • A Spark cluster contains 1+ worker nodes aka slave machines (simplified view; I exclude pieces like cluster managers here.)
  • A worker node can run 1+ executors.
  • An executor is a process launched for an application on a worker node, which runs tasks and keeps data in memory or disk storage across them. Each application has its own executors. An executor has a certain amount of cores aka “slots” available to run tasks assigned to it.
  • A task is a unit of work that will be sent to one executor. That is, it runs (part of) the actual computation of your application. The SparkContext sends those tasks for the executors to run. Each task occupies one slot aka core in the parent executor.
  • A receiver (API, docs) is run within an executor as a long-running task. Each receiver is responsible for exactly one so-called input DStream (e.g. an input stream for reading from Kafka), and each receiver – and thus input DStream – occupies one core/slot.
  • An input DStream: an input DStream is a special DStream that connects Spark Streaming to external data sources for reading input data. For each external data source (e.g. Kafka) you need one such input DStream implementation. Once Spark Streaming is “connected” to an external data source via such input DStreams, any subsequent DStream transformations will create “normal” DStreams.

在Spark的执行模型中,每个应用程序有自己的executors,which stay up for the duration of the whole application and run 1+ tasks in multiple threads. 这种分离的实现类似于Storm的执行模型。这种架构变得越来越复杂一旦你介绍cluster managers像YARN或者Mesos,这里我不深讲。看看Spark文档里的Cluster Overview深入了解。

Kafka和Spark Streaming结合

概览

简单说,Spark Streaming 支持Kafka但还是有一些rough edges. 一个好的起步点对我来说是KafkaWordCount 这个例子,在Spark的代码库里。当我读这些代码还是有一些遗留的问题。 我很想理解如何:

  • 并行地从Kafka读取数据. 在Kafka里, 一个 topic 可以有 _N_ partitions, 理想情况下我们更想并行的读取这些 _N_ partitions. 这就是Kafka spout in Storm 所做的.
  • 并行地从一个Spark Streaming application写入数据到Kafka。

On top of those questions I also ran into several known issues in Spark and/or Spark Streaming, most of which have been discussed in the Spark mailing list. I’ll summarize the current state and known issues of the Kafka integration further down below.

Primer on topics, partitions, and parallelism in Kafka

For details see my articles Apache Kafka 0.8 Training Deck and Tutorial and Running a Multi-Broker Apache Kafka 0.8 Cluster on a Single Node. Kafka 存储数据在 topics,每个topic包括可以配置数量的 _partitions。topic的partitions的数量对于性能的考量是非常重要的,作为consumer并行处理的上限_:如果一个 topic 有 _N_ partitions, 那么你的应用程序只能最多_N_ threads 并行地consume 这个 topic (At least this is the case when you use Kafka’s built-in Scala/Java consumer API.) 当我说“application”时,我应该说成 consumer group在Kafka的术语中。一个consumer group被由你定义的字符串标识,在集群范围内的标识。所有的consumers是同一个consumer group的一部分,共同承担从指定的Kafka topic读取数据,最多允许N(= partitions数)个线程通过同一个group所有的consumers从topic读取。任何多余的线程将处于空闲状态。

多个Kafka consumer groups可以并行运行:当然你可以运行多个独立的逻辑 consumer applications 对于同一个Kafka topic. 每个逻辑application 将会运行它的consumer threads在唯一的consumer group id下.每个 application也可以用不同的并行读取 (见下文). 当我讲到各种方法配置并行读取在随后的单元时,我是指这些逻辑 consumer applications中的单个。

这里有些简单的例子.

  • Your application uses the consumer group id “terran” to read from a Kafka topic “zerg.hydra” that has 10 partitions. If you configure your application to consume the topic with only 1 thread, then this single thread will read data from all 10 partitions.
  • Same as above, but this time you configure 5 consumer threads. Here, each thread will read from 2 partitions.
  • Same as above, but this time you configure 10 consumer threads. Here, each thread will read from a single partition.
  • Same as above, but this time you configure 14 consumer threads. Here, 10 of the 14 threads will read from a single partition each, and the remaining 4 threads will be idle.

让我们介绍一些现实世界的复杂性,在这张简单的图片– Kafka 中的 rebalancing事件. Rebalancing 是贯穿Kafka生命周期的事件,当consumers加入或者离开一个consumer group(there are more conditions that trigger rebalancing but these are not important in this context; see my Kafka training deck for details on rebalancing).

  • 你的应用程序用 consumer group id “terran” 开始 consuming 在 1 线程下. 这个线程从10 partitions读取。在运行时, 你将增加线程数到14。那么, 突然在同一个consumer group有一个并行处理的改变。这将触发 rebalancing 在 Kafka. 当rebalancing 完成,你将有14个线程中的10个, 每个consuming单个partition ,剩余的4个线程将空闲。 你可能猜想到,最初的线程将会仅从一个partition读取,不会在从其他9个partition读取了。

现在我们对于topics,partitions有个基本的理解了, partition的数量是并行从Kafka读取的上限。But what are the resulting implications for an application – such as a Spark Streaming job or Storm topology – that reads its input data from Kafka?

  1. 并行读取: 你通常想要并行地从Kafka的N partitions读取,用N个线程。 And depending on the data volume you want to spread those threads across different NICs, 这通常意味着跨越多台机器. In Storm, this is achieved by setting the parallelism of the Kafka spout to _N_ via TopologyBuilder#setSpout(). The Spark equivalent is a bit trickier, and I will describe how to do this in further detail below.
  2. Downstream processing parallelism: Once retrieved from Kafka you want to process the data in parallel. Depending on your use case this level of parallelism must be different from the read parallelism. If your use case is CPU-bound, for instance, you want to have many more processing threads than read threads; this is achieved by shuffling or “fanning out” the data via the network from the few read threads to the many processing threads. Hence you pay for the access to more cores with increased network communication, serialization overhead, etc. In Storm, you perform such a shuffling via a shuffle groupingfrom the Kafka spout to the next downstream bolt. The Spark equivalent is the repartition transformation on DStreams.

The important takeaway is that it is possible – and often desired – to decouple the level of parallelisms for_reading from Kafka_ and for processing the data once read. In the next sections I will describe the various options you have at your disposal to configure read parallelism and downstream processing parallelism in Spark Streaming.

从Kafka读取

Spark Streaming中的并行读取

就像Kafka Spark Streaming也有partitions的概念.重要的是,理解Kafka每个topic 的partition与 the partitions of RDDs in Spark不是对应的。 The KafkaInputDStream of Spark Streaming – aka its Kafka “connector” – uses Kafka’s high-level consumer API, which means you have two 控制选项in Spark that determine read parallelism for Kafka:

  1. The number of input DStreams. Because Spark will run one receiver (= task) per input DStream, this means using multiple input DStreams will parallelize the read operations across multiple cores and thus, hopefully, across multiple machines and thereby NICs.
  2. The number of consumer threads per input DStream. Here, the same receiver (= task) will run multiple threads. That is, read operations will happen in parallel but on the same core/machine/NIC.

实际使用中推荐选项1 为什么呢?第一也是首要原因是从Kafka读取通常受限于network/NIC,例如你通常不会通过增加更多的线程在同一台机器来增加读取量。换句话说,CPU不太可能是从Kafka读取的瓶颈。第二,如果你使用选项2那么多个线程将会产生竞争锁,发送数据到称之为blocks (the += method of BlockGenerator that is used behind the scenes is synchronized on the block generator instance).

Number of partitions of the RDDs created by the input DStreams: The KafkaInputDStream will store individual messages received from Kafka into so-called blocks. From what I understand, a new block is generated every spark.streaming.blockInterval milliseconds, and each block is turned into a partition of the RDD that will eventually be created by the DStream. If this assumption of mine is true, then the number of partitions in the RDDs created by KafkaInputDStream is determined by batchInterval / spark.streaming.blockInterval, where batchInterval is the time interval at which streaming data will be divided into batches (set via a constructor parameter of StreamingContext). For example, if the batch interval is 2 seconds (default) and the block interval is 200ms (default), your RDD will contain 10 partitions. Please correct me if I’m mistaken.

选项1: 控制 input DStreams的数量

下面的例子来源于 Spark Streaming Programming Guide.

1
2
3
4
5
val ssc: StreamingContext = ??? // ignore for now
val kafkaParams: Map[String, String] = Map("group.id" -> "terran", /* ignore rest */)

val numInputDStreams = 5
val kafkaDStreams = (1 to numInputDStreams).map { _ => KafkaUtils.createStream(...) }

这个例子中我们创建了5个 input DStreams, 使得从Kafka读取数据的负担跨越了5个核,但愿是5个机器/NICs (我说 “但愿”是因为我不确定是否 Spark Streaming task 的放置策略会将receivers在不同的机器。) 所有的input DStreams 是 “terran” consumer group的一部分, Kafka API会确保这5个input DStreams a)会看到这个topic所有可用的数据,因为它分配各个partition到各个 input DStream b) 不会看到重复的数据,因为同一时间一个partition只被分配到一个 input DStream。换句话说, this setup of “collaborating” input DStreams works because of the consumer group behavior provided by the Kafka API, which is used behind the scenes by KafkaInputDStream. 我没在这个例子中展示出的是每个input DStream会创建多少个线程,这是通过给 KafkaUtils.createStream 方法传递参数来完成的 (实际输入的topic也是在这个方法的参数里定义的),我们将在下个单元做这件事。 但是在我们继续之前让我强调下几个已知的问题,this setup and with Spark Streaming in particular, which are caused on the one hand by current limitations of Spark in general and on the other hand by the current implementation of the Kafka input DStream in particular:

[当你使用我上面描述的 multi-input-stream实现时, 那么] 这些 consumers 工作在一个 [Kafka] consumer group, 它们尝试决定哪个 consumer consumes 哪个 partitions.它可能 syncpartitionrebalance失败, 那么你只有少量的consumers 真正地 consuming.要减轻这个问题 , 你可以设置rebalance retries很高的数值,并祈祷它奏效.

接下来,我们来看另一个“特性” —如果你的receiver 挂了 (OOM,硬件故障), 你将停止从Kafka接收数据!

spark-user discussionmarkmail.org/message/…

“停止从Kafka接收数据” 的问题需要一些解释( some explanation.) 现在,当你通过 ssc.start() 启动streaming application,进程开始运行并无限持续下去 – 甚至是输入数据源不可用时(例如Kafka)。这样, 不可能判断它与上行数据源失去连接从而不能响应这个事件,例如重连或者停止执行. 类似地如果你失去一个从数据源读取数据的receiver,then your streaming application will generate empty RDDs.

这是非常不幸的情况。一个粗糙的解决办法是重启你的streaming application ,不管是它上行数据源失效或者是一个receiver不可用。这个解决办法可能无法帮到你,如果你的用例须要你设置Kafka的配置选项 auto.offset.reset to “smallest” –因为一个已知的bug在 Spark Streaming,你的streaming application最终的行为可能不是你想要的,查看下文的这个单元 Known issues in Spark Streaming 了解详情。

选项 2: 控制每个input DStream的consumer 线程数

在这个例子中我们创建_单个_ input DStream ,并配置3个consumer线程执行 – in the same receiver/task and thus on the same core/machine/NIC – to read from the Kafka topic “zerg.hydra”.

1
2
3
4
5
6
val ssc: StreamingContext = ??? // ignore for now
val kafkaParams: Map[String, String] = Map("group.id" -> "terran", ...)

val consumerThreadsPerInputDstream = 3
val topics = Map("zerg.hydra" -> consumerThreadsPerInputDstream)
val stream = KafkaUtils.createStream(ssc, kafkaParams, topics, ...)

KafkaUtils.createStream 方法是重载的, 所以有一些不同的方法特征。 In this example we pick the Scala variant that gives us the most control.

组合选项1和2

这是更完整的例子结合了前面提到的两种技术:

1
2
3
4
5
6
7
8
val ssc: StreamingContext = ???
val kafkaParams: Map[String, String] = Map("group.id" -> "terran", ...)

val numDStreams = 5
val topics = Map("zerg.hydra" -> 1)
val kafkaDStreams = (1 to numDStreams).map { _ =>
KafkaUtils.createStream(ssc, kafkaParams, topics, ...)
}

我们创建5个 input DStreams,每个运行在单独的consumer线程。如果 topic “zerg.hydra” 有5个 partitions (或更少), 那么这通常是最好的方法并行地读取操作,如果你主要考虑最大吞吐量。

Spark Streaming中的下行并行处理

之前的单元我们涵盖了并行地从Kafka读取。 Now we can tackle parallelizing the downstream data processing in Spark. Here, you must keep in mind how Spark itself parallelizes its processing. Like Kafka, Spark ties the parallelism to the number of (RDD) partitions by running one task per RDD partition (sometimes partitions are still called “slices” in the docs).

就像其他的Spark应用: 一旦Spark Streaming应用收到输入的数据,任何更进一步的处理跟non-streaming Spark 应用一样.那么, 你完全用和你处理“normal” Spark data flows完全一样的工具和方式。See Level of Parallelism in Data Processing.

This gives us two control knobs:

  1. The number of input DStreams, i.e. what we receive as a result of the previous sections on read parallelism. This is our starting point, which we can either take as-is or modify with the next option.
  2. The repartition DStream 变换. It returns a new DStream with an increased or decreased level _N_ of parallelism. Each RDD in the returned DStream has exactly _N_ partitions. DStreams are a continuous series of RDDs, and behind the scenes DStream.repartition calls RDD.repartition. The latter “reshuffles the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.” In other words, DStream.repartitionis very similar to Storm’s shuffle grouping.

Hence repartition is our primary means to decouple read parallelism from processing parallelism. It allows us to set the number of processing tasks and thus the number of cores that will be used for the processing. Indirectly, we also influence the number of machines/NICs that will be involved. 一个相关的 DStream 变换是 union. (这个方法也存在于StreamingContext,where it returns the unified DStream from multiple DStreams of the same type and same slide duration. Most likely you would use the StreamingContext variant.) union会返回一个基于UnionRDD的 UnionDStream. UnionRDD是由所有被联合的RDDs的所有partitions组成, i.e.如果你联合3RDDs每个有10 partitions那么你的联合RDD实例将会包含30partitions。换句话说, union 会把多个DStreams塞入一个DStream/RDD,但是它不会改变并行处理的等级。你是否需要用 union 得根据你的用例是否需要来源于Kafka 的所有partitions“在一个地方”,它主要是因为语义上的需要。像这样的一个例子是当你需要进行不同元素的(全局)计数时。

Note: RDDs are not ordered. 当你 union RDDs, 那么最终的 RDD 不会有个良好的排序。 如果你需要 sort the RDD.

Your use case will determine which knobs and which combination thereof you need to use. Let’s say your use case is CPU-bound. Here, you may want to consume the Kafka topic “zerg.hydra” (which has five Kafka partitions) with a read parallelism of 5 – i.e. 5 receivers with 1 consumer thread each – but bump up the processing parallelism to 20:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
val ssc: StreamingContext = ???
val kafkaParams: Map[String, String] = Map("group.id" -> "terran", ...)
val readParallelism = 5
val topics = Map("zerg.hydra" -> 1)

val kafkaDStreams = (1 to readParallelism).map { _ =>
KafkaUtils.createStream(ssc, kafkaParams, topics, ...)
}
// collection of five \*input\* DStreams = handled by five receivers/tasks

val unionDStream = ssc.union(kafkaDStreams) // often unnecessary, just showcasing how to do it
// single DStream

val processingParallelism = 20
val processingDStream = unionDStream(processingParallelism)
// single DStream but now with 20 partitions

在下一个单元我们将所有这些片段联系在一起,也会涉及实际的数据处理。

写入到 Kafka

写入到Kafka须要通过 foreachRDD 输出操作来完成:

The most generic output operator that applies a function, func, to each RDD generated from the stream. This function should push the data in each RDD to a external system, like saving the RDD to files, or writing it over the network to a database. Note that the function func is executed at the driver, and will usually have RDD actions in it that will force the computation of the streaming RDDs. Note: The remark “the function func is executed at the driver” does not mean that, say, a Kafka producer itself would be run from the driver. Rather, read this remark more as “the function func is_evaluated_ at the driver”. The actual behavior will become more clear once you read Design Patterns for using foreachRDD.

You should read the section Design Patterns for using foreachRDD in the Spark docs, which explains the recommended patterns as well as common pitfalls when using foreachRDD to talk to external systems. In my case, I decided to follow the recommendation to re-use Kafka producer instances across multiple RDDs/batches via a pool of producers. I implemented such a pool with Apache Commons Pool, see PooledKafkaProducerAppFactory. Factories are helpful in this context because of Spark’s execution and serialization model. The pool itself is provided to the tasks via a broadcast variable. The end result looks as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
val producerPool = {
// See the full code on GitHub for details on how the pool is created
val pool = createKafkaProducerPool(kafkaZkCluster.kafka.brokerList, outputTopic.name)
ssc.sparkContext.broadcast(pool)
}

stream.map { ... }.foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
// Get a producer from the shared pool
val p = producerPool.value.borrowObject()
partitionOfRecords.foreach { case tweet: Tweet =>
// Convert pojo back into Avro binary format
val bytes = converter.value.apply(tweet)
// Send the bytes to Kafka
p.send(bytes)
}
// Returning the producer to the pool also shuts it down
producerPool.value.returnObject(p)
})
})

Keep in mind that Spark Streaming creates many RRDs per minute, each of which contains multiple partitions, so preferably you shouldn’t create new Kafka producers for each partition, let alone for each Kafka message. The setup above minimizes the creation of Kafka producer instances, and also minimizes the number of TCP connections that are being established with the Kafka cluster. You can use this pool setup to precisely control the number of Kafka producer instances that are being made available to your streaming application (if in doubt, use fewer).

Complete example

The code example below is the gist of my example Spark Streaming application (see the full code for details and explanations). Here, I demonstrate how to:

  • Read Avro-encoded data (the Tweet class) from a Kafka topic in parallel. We use a the optimal read parallelism of one single-threaded input DStream per Kafka partition.
  • Deserialize the Avro-encoded data back into pojos, then serializing them back into binary. The serialization is performed via Twitter Bijection.
  • Write the results back into a different Kafka topic via a Kafka producer pool.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// Set up the input DStream to read from Kafka (in parallel)
val kafkaStream = {
val sparkStreamingConsumerGroup = "spark-streaming-consumer-group"
val kafkaParams = Map(
"zookeeper.connect" -> "zookeeper1:2181",
"group.id" -> "spark-streaming-test",
"zookeeper.connection.timeout.ms" -> "1000")
val inputTopic = "input-topic"
val numPartitionsOfInputTopic = 5
val streams = (1 to numPartitionsOfInputTopic) map { _ =>
KafkaUtils.createStream(ssc, kafkaParams, Map(inputTopic -> 1), StorageLevel.MEMORY_ONLY_SER).map(_._2)
}
val unifiedStream = ssc.union(streams)
val sparkProcessingParallelism = 1 // You'd probably pick a higher value than 1 in production.
unifiedStream.repartition(sparkProcessingParallelism)
}

// We use accumulators to track global "counters" across the tasks of our streaming app
val numInputMessages = ssc.sparkContext.accumulator(0L, "Kafka messages consumed")
val numOutputMessages = ssc.sparkContext.accumulator(0L, "Kafka messages produced")
// We use a broadcast variable to share a pool of Kafka producers, which we use to write data from Spark to Kafka.
val producerPool = {
val pool = createKafkaProducerPool(kafkaZkCluster.kafka.brokerList, outputTopic.name)
ssc.sparkContext.broadcast(pool)
}
// We also use a broadcast variable for our Avro Injection (Twitter Bijection)
val converter = ssc.sparkContext.broadcast(SpecificAvroCodecs.toBinary[Tweet])

// Define the actual data flow of the streaming job
kafkaStream.map { case bytes =>
numInputMessages += 1
// Convert Avro binary data to pojo
converter.value.invert(bytes) match {
case Success(tweet) => tweet
case Failure(e) => // ignore if the conversion failed
}
}.foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
val p = producerPool.value.borrowObject()
partitionOfRecords.foreach { case tweet: Tweet =>
// Convert pojo back into Avro binary format
val bytes = converter.value.apply(tweet)
// Send the bytes to Kafka
p.send(bytes)
numOutputMessages += 1
}
producerPool.value.returnObject(p)
})
})

// Run the streaming job
ssc.start()
ssc.awaitTermination()

See the full source code for further details and explanations._ Personally, I really like the conciseness and expressiveness of the Spark Streaming code. As Bobby Evans and Tom Graves are eluding to in their talk, the Storm equivalent of this code is more verbose and comparatively lower level: The KafkaStormSpec in kafka-storm-starter wires and runs a Storm topology that performs the same computations. Well, the spec file itself is only a few lines of code once you exclude the code comments, which I only keep for didactic reasons; however, keep in mind that in Storm’s Java API you cannot use Scala-like anonymous functions as I show in the Spark Streaming example above (e.g. the mapand foreach steps). Instead you must write “full” classes – bolts in plain Storm, functions/filters in Storm Trident – to achieve the same functionality, see e.g. AvroDecoderBolt. This feels a bit similar to, say, having to code against Spark’s own API using Java, where juggling with anonymous functions is IMHO just as painful. Lastly, I also liked the Spark documentation. It was very easy to get started, and even some more advanced use is covered (e.g. Tuning Spark). I still had to browse the mailing list and also dive into the source code, but the general starting experience was ok – only the Kafka integration part was lacking (hence this blog post). Good job to everyone involved maintaining the docs!

Spark Streaming的已知问题

现在你可能已经猜到的确有许多尚未解决的问题在 Spark Streaming.我试着总结我所发现的 一方面有一些关于如何正确的从Kafka读取和写入的混乱,这你可以循着邮件列表讨论比如 Multiple Kafka Receivers and UnionHow to scale more consumer to Kafka stream . 另一方面显然还有一些Spark固有的问题在Spark Streaming ,尤其是在故障情况下的数据丢失。换句话说,你不想在生产中遇到的问题 !

  • The current (v1.1) driver in Spark does not recover such raw data that has been received but not processed (source). Here, your Spark application may lose data under certain conditions. Tathagata Das points out that driver recovery should be fixed in Spark v1.2, which will be released around the end of 2014.
  • The current Kafka “connector” of Spark is based on Kafka’s high-level consumer API. One effect of this is that Spark Streaming cannot rely on its KafkaInputDStream to properly replay data from Kafka in case of a downstream data loss (e.g. Spark machines died).
    • Some people even advocate that the current Kafka connector of Spark should not be used in production because it is based on the high-level consumer API of Kafka. Instead Spark should use the simple consumer API (like Storm’s Kafka spout does), which allows you to control offsets and partition assignment deterministically.
  • The Spark community has been working on filling the previously mentioned gap with e.g. Dibyendu Bhattacharya’s kafka-spark-consumer. The latter is a port of Apache Storm’s Kafka spout, which is based on Kafka’s so-called simple consumer API, which provides better replaying control in case of downstream failures.
  • Even given those volunteer efforts, the Spark team would prefer to not special-case data recovery for Kafka, as their goal is “to provide strong guarantee, exactly-once semantics in all transformations” (source), which is understandable. On the flip side it still feels a bit like a wasted opportunity to not leverage Kafka’s built-in replaying capabilities. Tough call!
  • SPARK-1340: In the case of Kafka input DStreams, receivers are not getting restarted if the worker running the receiver fails. So if a worker dies in production, you will simply miss the data the receiver(s) was/were responsible to retrieve from Kafka.
  • See also Failure of a Worker Node for further discussions on data loss scenarios (“lost input data!”) as well as data duplication scenarios (“wrote output data twice!”). Applies to Kafka, too.
  • Spark’s usage of the Kafka consumer parameter auto.offset.reset is different from Kafka’s semantics. In Kafka, the behavior of setting auto.offset.reset to “smallest” is that the consumer will automatically reset the offset to the smallest offset when a) there is no existing offset stored in ZooKeeper or b) there is an existing offset but it is out of range. Spark however will always remove existing offsets and then start all the way from zero again. This means whenever you restart your application with auto.offset.reset = "smallest", your application will completely re-process all available Kafka data. Doh! See this discussion and that discussion.
  • SPARK-1341: Ability to control the data rate in Spark Streaming. This is relevant in so far that if you are already in trouble because of the other Kafka-relatd issues above (e.g. the auto.offset.resetmisbehavior), then what may happen is that your streaming application must or thinks it must re-process a lot of older data. But since there is no built-in rate limitation this may cause your workers to become overwhelmed and run out of memory.

Apart from those failure handling and Kafka-focused issues there are also scaling and stability concerns. Again, please refer to the Spark and Storm talk of Bobby and Tom for further details. Both of them have more experience with Spark than I do. I also came across one comment that there may be issues with the (awesome!) G1 garbage collector that is available in Java 1.7.0u4+, but I didn’t run into any such issue so far.

安全性:

七牛:针对用户上传的数据,我们都会创建至少三副本并跨IDC存储到多个数据中心,保证数据的高可用性和可靠性。另,用户可通过七牛的身份验证机制设置不同的访问权限和访问级别。 又拍:跨机房三备份,确保数据绝对安全。

分发:

七牛:七牛遍布全球的500多个加速节点,自动选择离用户最近的节点,使得数据的上传下载速度均得到最优化。相较传统IDC,访问速度提升数倍。 又拍:分布全国60多个节点,全覆盖电信、联通、移动、教育网及地方宽带线路;区域就近上传下载;不限带宽峰值。又拍云存储,助你提速60%-300%。

数据处理:

图片:

七牛:

1、缩略图(多种规格缩放)

2、图文混排水印

3、自定义裁剪区域

4、旋转

5、调整图片质量

6、图片格式转换

7、原图保护

8、防盗链

又拍: 采用图片无损压缩技术,动态生成业务所需的各种缩略图,功能包括图片水印、裁剪、旋转、类型转换等。

视频:

七牛: 支持常见的ffmpeg音视频格式互转,比如mp3、aac、m4a、mp4、avi、flv等。 支持视频帧提取,可以取视频任意时间点的截图。 支持流媒体传输协议(HLS)。 又拍:仅首页提及“提供音视频转码服务;”

SDK

七牛:开发语言包括:JAVA, C, C++, C#, PHP, Python, ruby, ios, android, go, node.js等。http://developer.qiniu.com/docs/v6/sdk/ 又拍:提供 HTTP RESTful API、FORM API 和 FTP 等多种接口,以及提供 JAVA、PHP、Python、Node.js、C、C#、GO、iOS、Android 等丰富的SDK,方便开发团队快速使用又拍云服务。 http://docs.upyun.com/download/#sdk (该页面上没有GO C# sdk下载

Elasticsearch不进行认证或授权,留下了一个习题给开发者。本文给出了概述,当你配置Elasticsearch集群的安全设置时要记住,为用户提供(有限)权限访问群集,当你不一定能(完全)信任他们时。

Introduction

作为Elasticsearch的提供商,安全对于Found是非常重要的,我们需要保护我们的用户免受其他可能的恶意,还要保护我们的用户不会因为他们自身引起麻烦。 本文基于Elasticsearch in production(生产环境),我们介绍了许多(生产环境)相关的主题讨论。在本文中我们将扩大一些内容,并阐述当实际执行的时候需要记住哪些地方。 从本质上讲,你需要仔细的端详你发给Elasticsearch的请求;就像任何其他数据库。但与大多数其他数据库不同的是,Elasticsearch有一个功能,允许任意代码执行。这带来了一些有趣的挑战! 我们来看看在不同级别的信任(你可以给你的用户)带来的细微差别,风险;从任意一端完全访问发出请求,,对另一端只是参数预先定义的请求。

Responsibility

Elasticsearch不具有用户的概念。从本质上讲,任何人都可以发送任意请求到集群(他)是一个“超级用户”。

如果你曾使用过类似PostgreSQL的系统,你可以限制访问数据库的表,函数等等,拥有高粒度的访问控制系统,你可能会试图找到一种方来限制某些操作或某些indexes,目前,Elasticsearch并不认为这是自己的工作。Elasticsearch不具有用户的概念。从本质上讲,任何人都可以发送任意请求到集群并且是一个“超级用户”。这是一个合理的限制强加。有这么多的方式来实现不同的身份验证和授权方案,其中不少是紧密耦合的应用领域。 很多在这里的建议适用于搜索引擎和数据库比Elasticsearch等为好。我们的意思并不是意味着这些本质上是不安全的,还是批评他们的选择。它是一个完全合理的决定,离开安全给用户。但是,我们要提高对Elasticsearch安全相关的方面的认识。

Goals

按重要性排序,我们要做到以下几点:

  1. 防止执行任意代码的脚本功能的使用。如果我们不能这样做,别的仅仅是“security through obscurity”,可以被绕过。但禁用动态脚本引出了一些难题,。
  2. 限制谁可以访问什么:搜索和索引。这可以通过代理层实现。
  3. 阻止请求可以压倒集群,导致拒绝式服务。这是很难完全避免,如果任意的搜索请求是允许的。

为了开发的目的,我们也将看到这些东西如何应用即使在本地运行Elasticsearch 作为不同信任级别的一个例子,假设你为不同的客户安装了多个同一套CMS。你相信CMS不做疯狂的事情,但要求验证,并有单独的索引是个好主意,以确保万一。因此,您可以允许访问CMS做任意请求向其允许的indexes。 CMS虽然暴露于天下。它不接受任何随意的请求。它将搜索参数转换到正确的Elasticsearch请求,并将其发送到相应的索引。

Scripting for Fun and Profit

最终用户(可以提供脚本)必须假定 have shell access到你的主机。

Elasticsearch具有非常强大的脚本工具。这些都是重要的在许多情况下: updates, facets, filters, scoring等等。 这些脚本不是在沙箱(sandbox))中运行。因此,最终用户(可以提供脚本)必须被假定为有 equivalent of shell access(shell访问到你的主机的权限)是等效的。 没有什么可以阻止一个脚本第二次发送到Elasticsearch从而避免任何URL-based访问控制或做任何Elasticsearch有权限做的。其结果是,如果你不能完全信任你的用户,动态脚本必须被禁用(dynamic scripts must be disabled )。 因此,我们建议对具有动态脚本启用黑名单或净化不安全的脚本( sanitize scripts).当脚本的执行不是在沙箱时,很难断定一个脚本没有做不好的事情。Flash和Java applet’s 安全问题的历史证明它是多么难以形成沙箱而没有漏洞。此外,被证实不可能推断脚本是否会终止( provably impossible to reason whether a script will terminate)或spin和造成拒绝式服务。

Scripting without Dynamic Scripts

我们强调禁用动态脚本的重要性。这对很多事情是重要的,所以我们需要确保我们仍然实现这些东西。因此,我们来看看使用 preloaded scripts 预装的脚本,可以在搜索时间带参数的一些例子。 预加载的脚本可以被放置在config/scripts-directory. 假设我们有如下脚本:

$ cat config/scripts/scoring/recency_boost.mvel
(0.08 / ((3.16*pow(10,-11)) * abs(now - doc[‘timestamp’].date.getMillis()) + 0.05)) + 1.0

那么我们可以定义{“script”: “scoring_recency_boost”, “params”: {“now”: 1386176910000}}。script参数是一个相对于config/scripts的路径,用作为分隔符,注意前缀是`scoring` 而不是scoring/。 使用预装的脚本提供了另一个好处:你的脚本定义在一个地方指定,并且在使用Elasticsearch集群中的各种应用程序不散落。这有助于提高你的搜索应用程序和配置文件的可维护性。你可以改变和改善你的脚本,而无需改变每一个客户端。

Limiting Indexes and Operations

注意:本节假定使用HTTP的API。目前,还没有办法容易地限制哪些客户端可以做什么。 Elasticsearch has many ways of specifying what indexes to search across or index to.。如果你有不同的用户在相同的共享集群上,让他们发送任意搜索请求(尽管没有脚本),你可能还需要限制他们可以访问的indexes。 典型地,indexes被指定在请求的URL,例如index_pattern/type_pattern/_search。然而也有一些APIs像multi-search, multi-getbulk 可以有index参数在请求的body里。重写哪些indexes被搜索和哪些documents被indexed,在 0.90.4版本里,之前介绍过的配置选项[allow_explicit_index](http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/url-access-control.html)可以让你禁止这些重写。 注意这个index实际上是个index pattern 无须是index name.如果你把用户定义加在indexes前缀,你应该考虑到index pattern.例如,ndex_name = “user123_“ + user_specified_index 如果user_specified_index = “,“将不会很好的工作,这个请求最终会像user123_,/_search,将会搜索到每个index。 将disable_dynamic_scripts设为true和allow_explicit_index设为false有可以确定请求到 _search- 和_msearch/_mget-endpoints 仅能接触到被允许的indexes,and similarly for indexing requests to _bulk。这使得在代理层限制哪些indexes可以被接触称为可能。 如果也要严格限制哪些documents可以被操作,你可以用filtered alias,任何search, count和类似的delete的请求将被应用filter。如果你依赖这些,make sure the underlying indexes cannot be accessed. 限制哪些indexes和endpoints你的用户可以请求,你也必须考虑到哪些methods可以被允许。你可能不想允许任何一个人DELETE an index。一个很好的实践是idempotent requests 禁止请求直接到index 仅允许POST-ing到_search_bulk和 PUT-ing和DELETE-ing到特定的documents.

Preventing Denial of Service

不像暴露数据那样有害,请求可以导致集群崩溃或者严重的性能影响,这些也是要避免的。不幸的是,避免这些不是像改配置变量那么容易。 Elasticsearch in production 这篇文章有个单元OutOfMemory-caused crashes 总之,内存溢出对于集群有灾难性的影响,这绝不能在生产环境中出现。 Elasticsearch有许多地方会消耗大量内存,不能详尽列出,举几个例子:

  • Field caches for fields to facet, sort and script on.
  • Filter caches.
  • Segments pending flushing.
  • Index metadata.

加载一个过大的字段可能是最普通引起内存溢出的原因,两个重要的改善将会随着Elasticsearch 1.0的到来更好的处理这些: 首先是document values.,在mapping里映射这些字段,Elasticsearch会用一种方式写入值使得依赖于操作系统页面缓存而更加有效的使用这些值。这可以极大的消减堆内存的消耗,尽管这种实现会使得慢一点儿。 第二个改善,执行写入操作的时候并不提交到master,is a circuit breaker. 其目的是强行限制多少内存可以被用于加载字段,如果超过这个限制将会中断,默认是禁用的,在合理的限制下,请求尝试加载过多将会中断并报CircuitBreakingException,这比OutOfMemory安全多了! 这些都需要一点儿调整配置和提交计划。虽然在内存使用率上改善了很多,仍然有个验证的性能影响,当miss-loading一个巨大的字段,其他真正需要的字段可能会被抹去,接着强制再次加载它们。 如果你允许任意结构的文档,你可能希望禁用动态mapping,虽然Elasticsearch经常被描述为一个无模式的数据库,Elasticsearch隐式的创建模。这在开发的时候工作的很好,可能需要在生产环境中关闭,例如,它可能导致问题,如果值作为你的对象的键: 假设你有一个对象{"access": [ {"123": "read"}, {"124": "write"} ]},虽然看似无害,这将在mapping里产生一个entry为每一个ID,随着成千上万这样的keys。mapping的大小会爆炸式的增长,因为每个key都会有一个entry。mapping也是集群状态的一部分,被复制到每一个节点。与面向文档的数据库(没有模式概念的)可以很好的工作,并将documents作为blobs。然而在Elasticsearch中绝不要把值作为键。这个例子应该改为{"access": [ {"user_id": 123, "level": "read"}, {"user_id": 124: "level": "write"} ] },with `access` as a nested type. 总之,虽然可以限制哪些indexex可以被搜索或indexed通过个简单的代理层,但不可能接受任意的请求erubuyong担心集群的稳定性和性能。这是关于任何数据库或搜索引擎的一个状况,应该不会感到意外。我们看到很多这种实现,Kibana it makes a lot of sense,因为它在很大程度上是一个Elasticsearch仪表板。但如果你复制它的使用模式并重新实现它在你最终用户所面对的应用程序,你也把这里提到的风险带过去了。

Staying Safe while Developing with Elasticsearch

Elasticsearch is 通常是通过 HTTP绑定到 localhost.直观地看,外部的注释无法连接到监听localhost的程序或被公司的防火墙保护的。然而你的浏览器可以抵达你的localhost,也许可能连接上servers通过公司的内部网络. 任何你访问的网站可以发送请求到你的本地Elasticsearch节点。你的浏览器会愉快地发起 HTTP 请求到 127.0.0.1/_search. 因此,任何网站可以探索任何运行在你本地的Elasticsearch的数据.然后它可以POST到某处调整settings for cross-origin resource sharing 会有些帮助, 尽管它还是可以tongue JSONP请求进行搜索。 我们对动态脚本的警告,在这里也适用。你肯定不希望任何网站可以在你的机器上运行代码. 我们建议将Elasticsearch运行在虚拟主机里,若你开发时用这台机器并用它来浏览网页。不要在本地保存命案数据并禁用动态脚本。

Suggested Solutions

限制访问indexes并添加身份验证和SSL可以由多种工具来完成。这些已经超出本文的范围,Nginx做这个很流行,此外有各种Elasticsearch插件尝试添加基本认证.在Found我们提供这些作为代理层的一部分来路由请求。你可以配置 ACLs 来 HTTP 基本认证, SSL, 和限制哪些方法和路径可以被访问。此外,这些规则可以被组合,以适应几乎任何情况。大多数Elasticsearch 客户端现在支持HTTP基本认证,SSL和其他好东西,包括官方的客户端,所以没有理由不用它们。

尽管 Elasticsearch是 multi-tenant(多用户)的 ,并能愉快地服务于许多不同的用户和应用在同一个集群_。_有些时候你可能想要创建多个集群分割资源并提供额外的安全性。 从本质上讲,要减少对 [Preventing denial of service]一节中描述的问题和风险的影响范围. 例如,如果你有一个巨大的尖峰流量,增加吞吐量记录(你使用Logstash和Kibana捕获分析,对吧?)不应该影响到更重要的应用程序的性能。 Today, it’s easier than ever to use technologies like LXC and Docker to isolate processes, and constrain resource usage like disk space, memory and CPU. That is exactly what we do at Found. Customer clusters are completely isolated and resources are dedicated, not overprovisioned. This is very important, without these practices we would not be able to guarantee an acceptable level of security and performance would be unreliable.

Key Takeaways

This article has covered a lot of ground, but be sure to keep in mind the following.

  1. Disable dynamic scripts. They are dangerous.
  2. Understand the sometimes tricky configuration is required to limit access controls to indexes.
  3. Consider the performance implications of multiple tenants, a weakness or a bad query in one can bring down an entire cluster!

很酷的东西不断的加入Elasticsearch,正在一个令人兴奋的步伐,有些改善会使生活更加容易当处理这里提到过的挑战时。有些会带来新的挑战,你必须认为安全性是交给你的。记住安全性就像洋葱(onion), 一个好的策略是设置多层。不要让Elasticsearch 成为脆弱的一层在你应用程序的security onion!

原文:https://www.found.no/foundation/elasticsearch-security/

授权和身份验证在hadoop中是一个较易混淆的主题,首先也是最重要的事情是辨别在授权和身份验证之间这种细微而极其重要的差别,首先我们来定义这些术语: Authentication 身份验证的目的是确认当前所声称为某种身份的用户,确实是所声称的用户。 Authorization 授权的功能是定义资源的访问权限。 简单来说,身份验证是提供我是谁的一种方式,授权是决定我能做什么的方式。

身份验证

如果Hadoop的所有配置都是默认的话,Hadoop不会验证用户。这是一个重要的体现,因为它可以在企业的数据中心造成严重影响。让我们来看看这个例子. 假设用户Joe可以访问一个Hadoop集群。这个集群并没有任何Hadoop安全特性((Hadoop security features) 启用,这意味着不会尝试去验证用户的标识。这个集权的超级用户是hdfs,Joe没有用户hdfs的密码。然而Joe刚好有一个客户端机器,有一些配置可以让 Joe访问Hadoop集群,Joy心情不顺,执行以下命令:

sudo useradd hdfs

sudo -u hdfs hadoop fs -rmr /

集群执行以上工作,然后回头告诉你“Ok, hdfs, I deleted everything!”。 那么发生了什么?在一个非安全的集群,NameNode 和 JobTracker不会需要任何身份验证。如果你发起一个请求,声称你是hdfs或者mapred, NN/JT都会说“ok, I believe that,”允许你做任何hdfs 或mapred可以做的事。 Hadoop有能力要求身份验证,以Kerberos的形式。Kerberos是一种身份验证的协议,用 “tickets” 允许节点标识自身,如果你想深入了解Kerberos,我强烈推荐看看这Wikipedia page “Kerberos”). Hadoop可使用Kerberos协议确保当有人发出请求,他们真的是他们所说的。这种机制被用于在整个群集。在一个安全的Hadoop配置,所有的Hadoop守护进程使用Kerberos进行相互认证,这意味着,当两个守护进程相互交谈,他们每个确保其他守护进程是谁。此外,这允许的NameNode和JobTracker确保任何HDFS或MR请求被用适当的授权级别执行。

授权

授权跟验证是非常不同的。授权告诉我们任何给定的用户在Hadoop集群内可以做或不可以做的事,用户成功验证之后。在HDFS中,这主要是通过文件权限来制约。 HDFS文件权限与BSD文件的权限是非常相似的。如果你曾经使用ls-l在一个目录中,你可能已经看到这样的记录:

drwxr-xr-x 2 natty hadoop 4096 2012-03-01 11:18 foo

-rw-r–r– 1 natty hadoop 87 2012-02-13 12:48 bar

在最左侧,有一个字母字符串。第一个字母确定文件是否是一个目录或不是,然后有三组,每组三个字母。这些集表示所有者,组和其他用户的权限,而“RWX”读,写和执行权限,接下来“natty hadoop”部分表示该文件是由natty拥有,属于hadoop组。顺便说一句,一个规定的意图是-HDFS的语义是“尽可能类似Unix”。其结果是, certain HDFS operations follow BSD semantics, and others are closer to Unix semantics. 这里真正的问题是:什么是Hadoop的用户或组?答案是:他们是字符的字符串。仅此而已。 Hadoop的会很高兴地让你运行下面的命令

hadoop fs -chown fake_user:fake_group /test-dir

这样做的副作用是如果用户和组确实不存在,没有人可以访问这个文件除非是超级用户,默认的包括hdfs,mapred和其他hadoop超级组的成员。 在MapReduce的上下文中,用户和组用于确定谁允许提交或修改job.在MapReduce中job是由调度奇来控制队列提交。管理员可以定义允许谁通过 MapReduce ACLs将job提交到特定的队列。这些ACLs也可以基于 job-by-job来定义。类似于HDFS的权限,如果制定的用户或组不存在,则队列将不可用,除非由超级用户(总是可以提交或修改job) 接下来要问的问题是:NameNode和JobTracker如何找出用户属于哪个用户组的? 当用户运行Hadoop的命令时,NameNode会或的JobTracker获取有关用户正在运行该命令的一些信息。最重要的是,它知道该用户的用户名。后台程序,然后使用该用户名来确定用户所属的组。这是通过使用一个可插入的接口,它有接受用户名,并将其映射到该用户所属的组的能力。在默认安装中,用户-用户组的映射实现 forks off 子进程执行id -Gn [username]。提供如下列表:

natty@vorpal:~/cloudera $ id -Gn natty
natty adm lpadmin netdev admin sambashare hadoop hdfs mapred

Hadoop守护进程,然后使用组列表,以及用户名来确定用户是否具有所要求的适当的权限来访问该文件。也有一些包与Hadoop的其他实现,其中包括一个允许系统配置为get user-group mappings from an LDAP or Active Directory systems(从LDAP或Active Directory系统获取用户组映射)。如果有必要设立权限的组驻留在LDAP系统,这是非常有用的,但不是在unix上的集群主机。 一个需要注意的是,该组基团的的NameNode和JobTracker的都知道的可能比用户所属的客户端机器上的组群的不同。所有授权完成在NameNode会/ JobTracker的水平,所以对的DataNodes和的TaskTracker的用户和组不会影响授权,但如果启用了Kerberos身份验证,他们可能是必要的。此外,这是非常重要的的NameNode和JobTracker的两个明白的相同基团的任何给定的用户,或者在执行作业时,有可能是不确定的结果。如果对于用户所属的组有任何疑问,Hadoop的dfsgroups和Hadoop的mrgroups可以用来找出用户所属的组,分别依据 NameNode 和 JobTracker

全部放在一起

Hadoop的一个适当的,安全的安全协议可能需要将授权和认证结合起来。管理员应该看他们的安全需求,并确定哪些解决方案最适合他们,他们能有多大的风险承担对数据的处理上。此外,如果你要启用的Hadoop的Kerberos功能,我强烈建议去看看Cloudera Manager,它有助于使Kerberos的配置和设置非常容易。 原文:http://blog.cloudera.com/blog/2012/03/authorization-and-authentication-in-hadoop/

configuring internal authorization

CassandraAuthorizer is one of many possible IAuthorizer implementations, and the one that stores permissions in the system_auth.permissions table to support all authorization-related CQL statements. Configuration consists mainly of changing the authorizer option in the cassandra.yaml to use the CassandraAuthorizer.

Procedure

1.In the cassandra.yaml file, comment out the default AllowAllAuthorizer and add the CassandraAuthorizer. authorizer: CassandraAuthorizer You can use any authenticator except AllowAll. 2.Configure the replication factor for the system_auth keyspace to increase the replication factor to a number greater than 1. 3.Adjust the validity period for permissions caching by setting the permissions_validity_in_ms option in the cassandra.yaml file. Alternatively, disable permission caching by setting this option to 0.

Configuring authentication

To configure Cassandra to use internal authentication, first make a change to the cassandra.yaml file and increase the replication factor of the system_auth keyspace, as described in this procedure. Next, start up Cassandra using the default user name and password (cassandra/cassandra), and start cqlsh using the same credentials. Finally, use these CQL statements to set up user accounts to authorize users to access the database objects:

  • ALTER USER

    ALTER USER user_name
    WITH PASSWORD ‘password’ ( NOSUPERUSER | SUPERUSER )

  • CREATE USER

    CREATE USER IF NOT EXISTS user_name _WITH PASSWORD ‘password’

    ( NOSUPERUSER_ | _SUPERUSER )_
    

Procedure

  1. Change the authenticator option in the cassandra.yaml to PasswordAuthenticator.

    By default, the authenticator option is set to AllowAllAuthenticator.

    authenticator: PasswordAuthenticator

  2. Increase the replication factor for the system_auth keyspace to N (number of nodes).

    If you use the default, 1, and the node with the lone replica goes down, you will not be able to log into the cluster because the system_auth keyspace was not replicated.

  3. Restart the Cassandra client.

    The default superuser name and password that you use to start the client is stored in Cassandra.

    -u cassandra -p cassandra

  4. Start cqlsh using the superuser name and password.

    ./cqlsh -u cassandra -p cassandra

  5. Create another superuser, not named cassandra. This step is optional but highly recommended.

  6. Log in as that new superuser.
  7. Change the cassandra user password to something long and incomprehensible, and then forget about it. It won’t be used again.
  8. Take away the cassandra user’s superuser status.
  9. Use the CQL statements listed previously to set up user accounts and then grant permissions to access the database objects.

来源:http://www.datastax.com/documentation/cassandra/2.0/cassandra/security/secure_config_native_authorize_t.html

[root@localhost kafka_2.10-0.8.1.1]# bin/kafka-console-consumer.sh –zookeeper localhost:2181 –topic test –from-beginning
SLF4J: Failed to load class “org.slf4j.impl.StaticLoggerBinder”.
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

下载slf4j

tar xzf slf4j-1.7.7.tar.gz
ln -s slf4j-1.7.7/slf4j-simple-1.7.7.jar kafka_2.10-0.8.1.1/libs

Download: Hive Releases 进入下载目录

1
2
$ tar -xzvf hive-x.y.z.tar.gz
cp hive-default.xml.template hive-site.xml

设置环境变量

1
2
3
$ export HIVE_HOME=/path/to/hive/home
$ export PATH=$HIVE_HOME/bin:$PATH #
$ export METASTORE_PORT=9083

Running Hive Hive uses Hadoop, so: you must have Hadoop in your path OR

1
export HADOOP_HOME=<hadoop-install-dir>

In addition, you must create /tmp and /user/hive/warehouse (aka hive.metastore.warehouse.dir) and set them chmod g+w in HDFS before you can create a table in Hive. Commands to perform this setup:

1
2
3
4
$ $HADOOP_HOME/bin/hadoop fs -mkdir       /tmp
$ $HADOOP_HOME/bin/hadoop fs -mkdir /user/hive/warehouse
$ $HADOOP_HOME/bin/hadoop fs -chmod g+w /tmp
$ $HADOOP_HOME/bin/hadoop fs -chmod g+w /user/hive/warehouse

Running HCatalog To run the HCatalog server from the shell in Hive release 0.11.0 and later:

1
$ $HIVE_HOME/hcatalog/sbin/hcat_server.sh

To use the HCatalog command line interface (CLI) in Hive release 0.11.0 and later:

1
$ $HIVE_HOME/hcatalog/bin/hcat

For more information, see HCatalog Installation from Tarball and HCatalog CLI in the HCatalog manual. Running WebHCat (Templeton) To run the WebHCat server from the shell in Hive release 0.11.0 and later:

1
$ $HIVE_HOME/hcatalog/sbin/webhcat_server.sh

For more information, see WebHCat Installation in the WebHCat manual. DDL Operations https://cwiki.apache.org/confluence/display/Hive/GettingStarted#GettingStarted-DDLOperations 启动服务metastore $HIVE_HOME/bin/hive –service metastore & 启动hiveserver2 $HIVE_HOME/bin/hive –service hiveserver2 HiveClient: https://cwiki.apache.org/confluence/display/Hive/HiveClient last edited by Lefty Leverenz on Aug 20, 2014. 由于我用的hadoop2.5.0 hive 0.13.1 所以文档上有些内容不符合。这里贴上经过修改的python 代码,代码里的hive路径和数据文件路径根据情况修改

#!/usr/bin/env python

import sys
sys.path.append('/usr/local/share/applications/apache-hive-0.13.1-bin/apache-hive-0.13.1-bin/lib/py')
from hive_service import ThriftHive
from hive_service.ttypes import HiveServerException
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol

try:
    transport = TSocket.TSocket('localhost', 10000)
    transport = TTransport.TBufferedTransport(transport)
    protocol = TBinaryProtocol.TBinaryProtocol(transport)

    client = ThriftHive.Client(protocol)
    transport.open()

    client.execute("CREATE TABLE IF NOT EXISTS r(a STRING, b INT, c DOUBLE)")
    client.execute("LOAD DATA LOCAL INPATH '/usr/local/share/applications/apache-hive-0.13.1-bin/apache-hive-0.13.1-bin/ml-100k/u.data' OVERWRITE  INTO TABLE r")
    client.execute("SELECT * FROM r")
    while (1):
      row = client.fetchOne()
      if (row == None):
        break
      print row
    client.execute("SELECT * FROM r")
    print client.fetchAll()

    transport.close()

except Thrift.TException, tx:
    print '%s' % (tx.message)

Platform:Fedora release 19 (Schrödinger’s Cat) 3.14.13-100.fc19.x86_64 Installing software:

$ sudo yum install ssh
$ sudo yum install rsync

Dowload hadoop: Apache Download Mirrors 准备工作: 进入hadoop目录

$ vi etc/hadoop/hadoop-env.sh

set to the root of your Java installation

export JAVA_HOME=/path/to/java/home

Assuming your installation directory is /usr/local/hadoop

export HADOOP_PREFIX=/path/to/hadoop/home

伪分布式配置: 进入hadoop目录,编辑文件 etc/hadoop/core-site.xml:

fs.defaultFS hdfs://localhost.localdomain:9000

这里由于fedora 的/etc/hosts 默认配置localhost.localdomain 解析到127.0.0.1,所以跟官方文档有点不一样。 etc/hadoop/hdfs-site.xml:

dfs.replication 1

设置ssh key

$ ssh-keygen -t dsa -P ‘’ -f ~/.ssh/id_dsa
$ cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys

按照官方文档设置之前检查/etc/ssh/sshd_config # The default requires explicit activation of protocol 1 Protocol 2 #此处去掉注释 #添加 HostKey ~/.ssh/id_dsa 检查AuthorizedKeysFile配置 # The default is to check both .ssh/authorized_keys and .ssh/authorized_keys2 # but this is overridden so installations will only check .ssh/authorized_keys AuthorizedKeysFile .ssh/authorized_keys 配置开机启动ssh服务

$ systemctl enable sshd.service

启动ssh服务

$ service sshd start

开放22端口

$ iptables -A INPUT -p tcp –dport 22 -j ACCEPT

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×