Auswahl der Partition des Azure-Ereignis-Hubs bei Verwendung der Rest-API
Ich versuche, nach einigen fehlgeschlagenen Tests mithilfe von Python und der Rest-API Nachrichten an Azure Event Hub zu senden. Ich habe Arbeitscode gefunden (siehe unten), möchte jedoch auswählen können, an welche Partition das Ereignis gesendet werden soll.
Ist dies mit der Rest-API möglich und wenn ja, wie sollte es gemacht werden?
#!/user/bin/python
import json
from datetime import datetime
from multiprocessing import Pool
# from azure.servicebus import _service_bus_error_handler
from azure.servicebus.servicebusservice import ServiceBusService, ServiceBusSASAuthentication
from azure.http import (
HTTPRequest,
HTTPError
)
from azure.http.httpclient import _HTTPClient
EVENT_HUB_HOST = "mysecrethub.servicebus.windows.net"
EVENT_HUB_NAME = "secerthub-name"
KEYNAME = "senderkey" # needs to be loaded from ENV
KEYVALUE = "keyvalue" # needs to be loaded from ENV
EXTRA_HEADERS = []
NUM_OF_PARTITIONS = 16
class EventHubClient(object):
def __init__(self, host, hubname, keyname, keyvalue):
self._host = host
self._hub = hubname
self._keyname = keyname
self._key = keyvalue
def sendMessage(self, body, partition=None, additional_headers=None):
eventHubHost = self._host
httpclient = _HTTPClient(service_instance=self)
sasKeyName = self._keyname
sasKeyValue = self._key
authentication = ServiceBusSASAuthentication(sasKeyName, sasKeyValue)
request = HTTPRequest()
request.method = "POST"
request.host = eventHubHost
request.protocol_override = "https"
request.path = "/%s/messages?api-version=2014-01" % (self._hub)
request.body = body
request.headers.append(('Content-Type', 'application/atom+xml;type=entry;charset=utf-8'))
if additional_headers is not None:
for item in additional_headers:
request.headers.append(item)
if partition is not None:
value = json.dumps({'PartitionKey': partition})
request.headers.append(('BrokerProperties', value))
authentication.sign_request(request, httpclient)
request.headers.append(('Content-Length', str(len(request.body))))
status = 0
try:
resp = httpclient.perform_request(request)
status = resp.status
except HTTPError as ex:
status = ex.status
# print request.headers
return status
def prepare_message(appid, sessionid, partitionKey=None, SessionEllapsed=None, DeviceOs=None):
message = {"Name": "MonitorEvent"}
Attributes = {"AppId": appid, "SessionStarted": "".join(str(datetime.now())[:-3])}
if SessionEllapsed is not None:
Attributes['SessionEllapsed'] = SessionEllapsed
if DeviceOs is not None:
Attributes['DeviceOs'] = DeviceOs
if partitionKey is not None:
message["PartitionKey"] = str(partitionKey)
message["PartitionId"] = str(partitionKey)
Attributes['ItemId'] = partitionKey
message['Attributes'] = Attributes
return json.dumps(message)
def send_monitoring_event(partition):
hubClient = EventHubClient(EVENT_HUB_HOST, EVENT_HUB_NAME, KEYNAME, KEYVALUE)
appid = 1
sendertime = datetime.now().strftime('%Y%M%d-%H%M%S')
message = prepare_message(appid, sendertime, partitionKey=partition, SessionEllapsed=1, DeviceOs='Monitor' + str(partition))
# print message
hubStatus = hubClient.sendMessage(message, partition=None, additional_headers=EXTRA_HEADERS)
# return the HTTP status to the caller
return hubStatus
def main():
pool = Pool(processes=NUM_OF_PARTITIONS)
print pool.map(send_monitoring_event, range(NUM_OF_PARTITIONS))
if __name__ == '__main__':
main()