Kafka RPC是基于kafka的RPC协议,旨在提供快速,稳定,可靠的远程呼叫服务。
我们之所以喜欢kafka,是因为它具有容错能力,可扩展性和邪恶的大吞吐量。
因此,如果您想要具有kafka功能的RPC服务,则kRPC是您正在寻找的工具。
pip install kafka-rpc
什么是RPC?
RPC是一种请求-响应协议。客户端启动RPC,客户端将请求消息发送到已知的远程服务器,以使用提供的参数执行指定的过程。
发布作业请求的一个称为客户端,而另一个获得作业和响应的称为服务器。
什么时候应该使用RPC?
当您必须调用不属于本地进程或计算机的函数,而又不想建立另一个复杂的网络框架来实现静态API或Soap等时,RPC比静态API快得多并且更容易使用。仅需调整几行代码即可实现RPC服务。
为什么是kafka-rpc,而不是其他RPC协议(如zerorpc,grpc和mprpc)?
这是比较。
RPC | 中间件 | 序列化 | 速度(QPS) | 特征 |
---|---|---|---|---|
Kafka | Kafka | 消息包 | 200+(同步)4700+(异步) | 动态负载重新平衡,大吞吐量,数据持久性,更快的序列化 |
清零 | 零平方米 | 消息包 | 450+ | 动态负载重新平衡(当所有服务器繁忙时失败),更快的序列化 |
grpc | 未知 | 原虫 | 未经测试 | 动态负载重新平衡,大吞吐量,仅支持功能rpc |
电脑 | 没有 | 消息包 | 19000+ | 光速 |
基准环境:
Ubuntu 19.10 x64(5.3.0-21-通用)
英特尔i5-8400
我开发kafka-rpc的唯一原因是zerorpc使我失败了!
经过几个月的搜索和测试,zerorpc是我曾经使用过的最好的rpc服务,但是它很容易出错!通常,开发人员不会注意到,因为在大多数情况下,我们使用RPC将作业从一个客户端直接发布到一台服务器。
但是,当您开发分发系统时,您将不得不将K种不同类型的作业从N个客户端发布到M个服务器。
问题是您实际上并不知道哪个服务器可用,或者不适合该服务器。
这就是负载平衡的地方。
Zeromq支持,zerorpc也支持。在反向代理模式下,zerorpc将发布作业,但不发送至服务器。可用的服务器将积极寻找工作。因此,这项工作将始终由最可用,因此性能最高的服务器来解决。但是,可悲的是,zerorpc并没有真正将作业排队,因此,当您根本没有可用的服务器时,这些作业将不会排队等待,而是会被放弃。
这就是为什么我需要 Kafka 。与大多数MQ不同,kafka提供了数据持久性,因此不再需要放弃任何工作。当所有服务器都不可用时,作业将排队等待排队。
如果客户端崩溃,则作业仍将保留在磁盘上(kafka功能),且没有损坏(带有副本)(您能想象得到)。
另外,kafka-rpc支持动态可伸缩性,可以始终将服务器添加到群集中或将其删除,因此作业将公平地分配到所有服务器,并且如果分配的服务器关闭,作业将重新路由到另一台正常运行的服务器。
尽管有一些小缺点,但它们都是伟大的程序员开发的好工具,我们总是欢迎您为他们的原始存储库以及支持numpy数组的我的forkzeroc和mprpc做出贡献。
# Part1: define a class class Sum: def add(self, x, y): return x + y # Part2: instantiate a class to an object s = Sum() # Part3: call a method of the object result = s.add(1, 2) # result = 3
from kafka_rpc import KRPCServer # Part1: define a class class Sum: def add(self, x, y): return x + y # Part2: instantiate a class to an object s = Sum() # assuming you kafka broker is on 0.0.0.0:9092 krs = KRPCServer('0.0.0.0:9092', handle=s, topic_name='sum') krs.server_forever()
from kafka_rpc import KRPCClient # assuming you kafka broker is on 0.0.0.0:9092 krc = KRPCClient('0.0.0.0:9092', topic_name='sum') # call method from client to server result = krc.add(1, 2) print(result) krc.close() # you can find the returned result in result['ret'] # result = { # 'ret': 3, # 'tact_time': 0.007979869842529297, # total process time # 'tact_time_server': 0.006567955017089844, # process time on server side # 'server_id': '192.168.1.x' # client ip # }
启用服务器端并发。分别,必须同时发送多个请求。
import time from kafka_rpc import KRPCServer # Part1: define a class class Sum: def add(self, x, y): # simulate blocking actions like I/O time.sleep(0.1) return x + y # Part2: instantiate a class to an object s = Sum() # assuming you kafka broker is on 0.0.0.0:9092 krs = KRPCServer('0.0.0.0:9092', handle=s, topic_name='sum', concurrent=128) krs.server_forever()
from concurrent.futures import ThreadPoolExecutor, as_completed from kafka_rpc import KRPCClient pool = ThreadPoolExecutor(128) # assuming you kafka broker is on 0.0.0.0:9092 krc = KRPCClient('0.0.0.0:9092', topic_name='sum') # call method concurrently from client to server # use pool.map if you like futures = [] for i in range(128): futures.append(pool.submit(krc.add, 1, 2, timeout=20)) for future in as_completed(futures): result = future.result() print(result) krc.close()
通过将use_redis = True添加到KRPCClient来启用redis加快缓存并临时存储输入/输出数据,或指定redis端口,db和密码。但是redis不支持异步操作,它会崩溃,仅在同步模式下会更快。
krc = KRPCClient('0.0.0.0:9092', topic_name='sum', use_redis=True, redis_port=6379, redis_db=0, redis_password='kafka_rpc.no.1')
通过向客户端和服务器两者添加verify = True或encryptize ='whatever_password + you / want'或两者来提高通信安全性。但是启用验证和加密对性能几乎没有影响。
# basic verification and encryption krs = KRPCServer('0.0.0.0:9092', handle=s, topic_name='sum', verify=True, encrypt='whatever_password+you/want') krc = KRPCClient('0.0.0.0:9092', 9092, topic_name='sum', verify=True, encrypt='whatever_password+you/want') # advanced verification and encryption with custom hash function, input: bytes, output: bytes krs = KRPCServer('0.0.0.0:9092', handle=s, topic_name='sum', verify=True, encrypt='whatever_password+you want', verification=lambda x: sha3_224(x).hexdigest().encode()) krc = KRPCClient('0.0.0.0:9092', topic_name='sum', verify=True, encrypt='whatever_password+you/want', verification=lambda x: sha3_224(x).hexdigest().encode())
使用zstd压缩和解压缩数据
krs = KRPCServer('0.0.0.0:9092', handle=s, topic_name='sum', use_compression=True) krc = KRPCClient('0.0.0.0:9092', 9092, topic_name='sum', use_compression=True)
如果use_redis = False,则不能多次实例化KRPCClient
如果use_redis = False,则不能多次实例化KRPCClient
如果use_redis = False,则不能多次实例化KRPCClient
你现在还记得吗?