kafka-rpc - 基于kafka的RPC协议。

Created at: 2020-01-03 16:39:23
Language: Python
License: Apache-2.0

Kafka RPC


介绍

Kafka RPC是基于kafka的RPC协议,旨在提供快速,稳定,可靠的远程呼叫服务。

我们之所以喜欢kafka,是因为它具有容错能力,可扩展性和邪恶的大吞吐量。

因此,如果您想要具有kafka功能的RPC服务,则kRPC是您正在寻找的工具。


安装

    pip install kafka-rpc

常问问题

  1. 什么是RPC

    RPC是一种请求-响应协议。客户端启动RPC,客户端将请求消息发送到已知的远程服务器,以使用提供的参数执行指定的过程。

    发布作业请求的一个称为客户端,而另一个获得作业和响应的称为服务器。

  2. 什么时候应该使用RPC?

    当您必须调用不属于本地进程或计算机的函数,而又不想建立另一个复杂的网络框架来实现静态API或Soap等时,RPC比静态API快得多并且更容易使用。仅需调整几行代码即可实现RPC服务。

  3. 为什么是kafka-rpc,而不是其他RPC协议(如zerorpcgrpcmprpc)

    这是比较。

    RPC 中间件 序列化 速度(QPS) 特征
    Kafka Kafka 消息包 200+(同步)4700+(异步) 动态负载重新平衡,大吞吐量,数据持久性,更快的序列化
    清零 零平方米 消息包 450+ 动态负载重新平衡(当所有服务器繁忙时失败),更快的序列化
    grpc 未知 原虫 未经测试 动态负载重新平衡,大吞吐量,仅支持功能rpc
    电脑 没有 消息包 19000+ 光速

    基准环境:

    Ubuntu 19.10 x64(5.3.0-21-通用)

    英特尔i5-8400


    我开发kafka-rpc的唯一原因是zerorpc使我失败了!

    经过几个月的搜索和测试,zerorpc是我曾经使用过的最好的rpc服务,但是它很容易出错!通常,开发人员不会注意到,因为在大多数情况下,我们使用RPC将作业从一个客户端直接发布到一台服务器。

    但是,当您开发分发系统时,您将不得不将K种不同类型的作业从N个客户端发布到M个服务器。

    问题是您实际上并不知道哪个服务器可用,或者不适合该服务器。

    这就是负载平衡的地方。

    Zeromq支持,zerorpc也支持。在反向代理模式下,zerorpc将发布作业,但不发送至服务器。可用的服务器将积极寻找工作。因此,这项工作将始终由最可用,因此性能最高的服务器来解决。但是,可悲的是,zerorpc并没有真正将作业排队,因此,当您根本没有可用的服务器时,这些作业将不会排队等待,而是会被放弃。

    这就是为什么我需要 Kafka 。与大多数MQ不同,kafka提供了数据持久性,因此不再需要放弃任何工作。当所有服务器都不可用时,作业将排队等待排队。

    如果客户端崩溃,则作业仍将保留在磁盘上(kafka功能),且没有损坏(带有副本)(您能想象得到)。

    另外,kafka-rpc支持动态可伸缩性,可以始终将服务器添加到群集中或将其删除,因此作业将公平地分配到所有服务器,并且如果分配的服务器关闭,作业将重新路由到另一台正常运行的服务器。

    尽管有一些小缺点,但它们都是伟大的程序员开发的好工具,我们总是欢迎您为他们的原始存储库以及支持numpy数组的我的forkzerocmprpc做出贡献。

下一步

  • [X]考虑到其较大的吞吐量优势,允许异步调用来优化QPS
  • [X]使用gevent而不是内置线程来加快速度,现在速度提高了40%。
  • []用cython重写

用法

假设您已经有一个对象,并且一切正常,就像这样

local_call.py

# Part1: define a class
class Sum:
    def add(self, x, y):
        return x + y

# Part2: instantiate a class to an object
s = Sum()

# Part3: call a method of the object
result = s.add(1, 2)  # result = 3

然后,您可以使用RPC在process1上运行Part1和Part2,并从process2调用Part3,即过程1的方法。

kafka_rpc_server_demo.py

from kafka_rpc import KRPCServer

# Part1: define a class
class Sum:
    def add(self, x, y):
        return x + y

# Part2: instantiate a class to an object
s = Sum()

# assuming you kafka broker is on 0.0.0.0:9092
krs = KRPCServer('0.0.0.0:9092', handle=s, topic_name='sum')
krs.server_forever()

kafka_rpc_client_demo.py

from kafka_rpc import KRPCClient

# assuming you kafka broker is on 0.0.0.0:9092
krc = KRPCClient('0.0.0.0:9092', topic_name='sum')

# call method from client to server
result = krc.add(1, 2)

print(result)

krc.close()

# you can find the returned result in result['ret']
# result = {
#     'ret': 3,
#     'tact_time': 0.007979869842529297,  # total process time
#     'tact_time_server': 0.006567955017089844,  # process time on server side
#     'server_id': '192.168.1.x'  # client ip
# }

高级用法

  1. 启用服务器端并发。分别,必须同时发送多个请求。

    kafka_rpc_server_concurrency_demo.py

     import time
     from kafka_rpc import KRPCServer
     
     
     # Part1: define a class
     class Sum:
         def add(self, x, y):
     
             # simulate blocking actions like I/O
             time.sleep(0.1)
     
             return x + y
     
     
     # Part2: instantiate a class to an object
     s = Sum()
     
     # assuming you kafka broker is on 0.0.0.0:9092
     krs = KRPCServer('0.0.0.0:9092', handle=s, topic_name='sum', concurrent=128)
     krs.server_forever()
    

    kafka_rpc_client_async_demo.py

     from concurrent.futures import ThreadPoolExecutor, as_completed
     from kafka_rpc import KRPCClient
     
     pool = ThreadPoolExecutor(128)
     
     # assuming you kafka broker is on 0.0.0.0:9092
     krc = KRPCClient('0.0.0.0:9092', topic_name='sum')
     
     # call method concurrently from client to server
     # use pool.map if you like
     futures = []
     for i in range(128):
         futures.append(pool.submit(krc.add, 1, 2, timeout=20))
     
     for future in as_completed(futures):
         result = future.result()
         print(result)
     
     krc.close()
    
  2. 通过将use_redis = True添加到KRPCClient来启用redis加快缓存并临时存储输入/输出数据,或指定redis端口,db和密码。但是redis不支持异步操作,它会崩溃,仅在同步模式下会更快。

     krc = KRPCClient('0.0.0.0:9092', topic_name='sum', use_redis=True, redis_port=6379, redis_db=0, redis_password='kafka_rpc.no.1')
    
  3. 通过向客户端和服务器两者添加verify = True或encryptize ='whatever_password + you / want'或两者来提高通信安全性。但是启用验证和加密对性能几乎没有影响。

     # basic verification and encryption
     krs = KRPCServer('0.0.0.0:9092', handle=s, topic_name='sum', verify=True, encrypt='whatever_password+you/want')
     krc = KRPCClient('0.0.0.0:9092', 9092, topic_name='sum', verify=True, encrypt='whatever_password+you/want')
     
     # advanced verification and encryption with custom hash function, input: bytes, output: bytes
     krs = KRPCServer('0.0.0.0:9092', handle=s, topic_name='sum', verify=True, encrypt='whatever_password+you want', verification=lambda x: sha3_224(x).hexdigest().encode())
     krc = KRPCClient('0.0.0.0:9092', topic_name='sum', verify=True, encrypt='whatever_password+you/want', verification=lambda x: sha3_224(x).hexdigest().encode())
    
  4. 使用zstd压缩和解压缩数据

     krs = KRPCServer('0.0.0.0:9092', handle=s, topic_name='sum', use_compression=True)
     krc = KRPCClient('0.0.0.0:9092', 9092, topic_name='sum', use_compression=True)
    

警告

如果use_redis = False,则不能多次实例化KRPCClient

如果use_redis = False,则不能多次实例化KRPCClient

如果use_redis = False,则不能多次实例化KRPCClient

你现在还记得吗?