100字范文,内容丰富有趣,生活中的好帮手!
100字范文 > 微信企业号回调模式 java_微信企业号回调模式验证与发送消息

微信企业号回调模式 java_微信企业号回调模式验证与发送消息

时间:2021-05-18 16:02:58

相关推荐

微信企业号回调模式 java_微信企业号回调模式验证与发送消息

最近放假闲着无聊,研究了一下微信企业号, 打算通过企业号做一个运维报警信息发送的功能,记录自己的操作

第一步

注册企业号,网上一搜一大把的教程,这里略过 微信企业号登录地址 https://qy./

第二步 登录后 点左侧 应用中心 -新建应用

第三步 在第二步第一图中的自建应用下面找到刚刚新建的应用 拉到最下面有一个模式选择,点击回调模式 会看到下图界面

Token 和EncodingAESKey 点击随机获取即可,上面的url需要你有自己的服务地址 你的服务接口需要跟据微信规则对请求信息加解密,然后返回一个明文字符串,后面会上代码 url先留空,等下服务弄好再回来填写 ,此时微信端就设置完成 页面不要关闭

第四步 先上验证url代码 python版本实现 本人是java程序员,用python来搞这个也是一时兴趣,用起来到是比java方便些

#-*- coding:utf-8 -*-

from SimpleHTTPServer importSimpleHTTPRequestHandlerimporthttpserverfrom SocketServer importThreadingMixInfrom SocketServer importThreadingTCPServerimportSocketServerfrom SocketServer importBaseRequestHandlerimporturlparsefrom WXBizMsgCrypt importWXBizMsgCryptimportxml.etree.cElementTree as ET

hostIP= ''portNum= 8080serverMessage= "msg_signature"sToken= "第三步中的token"sEncodingAESKey= "第三步中的EncodingAESKey"sCorpID= "这个是你的企业id 在企业号主页面设置中账号信息里有"

classmySoapServer(SimpleHTTPRequestHandler ):defdo_head(self):pass

defdo_GET(self):try:if self.path.find(serverMessage) == -1:

self.send_error(404, message=None)return

#解析请求

result =urlparse.urlparse(self.path)

params=urlparse.parse_qs(result.query,True)printparams

sVerifyMsgSig= params['msg_signature'][0]

sVerifyTimeStamp=params['timestamp'][0]

sVerifyNonce=params['nonce'][0]

sVerifyEchoStr= params['echostr'][0]

wxcpt=WXBizMsgCrypt(sToken, sEncodingAESKey, sCorpID)

ret, sEchoStr=wxcpt.VerifyURL(sVerifyMsgSig, sVerifyTimeStamp, sVerifyNonce, sVerifyEchoStr)print "ret=",ret

self.send_response(200, message=None)

self.send_header('Content-type', 'text/html')

self.end_headers()if ret ==0:

res= sEchoStr.decode("utf8")else:

res= "%s" %retprintres

self.wfile.write(res.encode(encoding='utf_8', errors='strict'))exceptIOError:

self.send_error(404, message=None)defGetQuery(str):

params= str[str.index("?") + 1:]

parsed_result={}

list= [param for param in params.split('&')]for item inlist:if item.find("=") > -1:

name= item[:item.index("=")]

value= urlparse.urlparse(item[item.index("=") + 1:])else:

name=item

value= ""

if name inparsed_result:

parsed_result[name].append(value)else:

parsed_result[name]=[value]return parsed_result

classThreadingHttpServer(ThreadingMixIn, ThreadingTCPServer):passmyServer=ThreadingHttpServer((hostIP, portNum), mySoapServer)print("Server Started ....")

myServer.serve_forever()

myServer.server_close()

把上面代码保存为server.py 把代码中的sToken ,sEncodingAESKeysCorpID 换成你自己的,其它不用动

在服务器上运行

python server.py

建议使用python2.6.*版本 其它版本未测试 不保证运行正常

其中 WXBizMsgCrypt包为微信提供 下载地址 http://qydev./download/python.zip

遇到问题执行

yum install python-devel.x86_64

pip install yum -y install gcc

pip install pycrypto

第五步 返回第三步中填写url http://你的ip:8080 点击验证 不出意外验证通过

当然,本人python小白到是出了很多意外,坑还比较多, 不过做的过程没有记录问题,只记下了最终正确的过程

第六步 上面过程验证通过后创建一个管理组,这里貌似至少有两个人关注公众号,因为管理组要求至少有一个非企业号创建人

添加好后应该就会拿到CorpID 和 Secret 这两个记录下来,之后发消息需要

第七步 发送消息 上代码

#!/usr/bin/python#encoding: utf-8#-*- coding: utf8 -*-

importurllibimporturllib2importjsonimportdatetimeclassWeiXinSendMsgClass(object):def __init__(self):

self.access_token=get_token()

self.to_user= ""self.to_party= ""self.to_tag= "2"self.msg_type= "text"self.agent_id= 1self.content= ""self.safe=0

self.data={"touser": self.to_user,"toparty": self.to_party,"totag": self.to_tag,"msgtype": self.msg_type,"agentid": self.agent_id,"text": {"content": self.content

},"safe": self.safe

}defsend(self, to_user, content):if to_user is not None and content is notNone:

self.data['touser'] =to_user

self.data['text']['content'] =contentelse:print

raiseRuntimeErrorimportrequestsimportjson

url= "https://qyapi./cgi-bin/message/send"querystring= {"access_token": self.access_token}

payload= json.dumps(self.data, encoding='utf-8', ensure_ascii=False)

headers={'content-type': "application/json",'cache-control': "no-cache",

}

response= requests.request("POST", url, verify=False,data=payload, headers=headers, params=querystring)

return_content=json.loads(response.content)if return_content["errcode"] == 0 and return_content["errmsg"] == "ok":print "Send successfully! %s" %return_contentelse:print "Send failed! %s" %return_contentdefget_token():

parameters={"corpid": "上一步中记录的CorpID","corpsecret": "上一步记录的Secret"}

url_parameters=urllib.urlencode(parameters)

token_url= "https://qyapi./cgi-bin/gettoken?"url= token_url +url_parameters

response=urllib2.urlopen(url)

result=response.read()

token_json=json.loads(result)

token_json= token_json['access_token']return token_json

上面代码只需要把corpid 和corpsecret 换成自己的就可以了 保存为wx_sendMessage.py

然后就可以在你需要发送消息的地方调用 啦

#-*- coding:utf-8 -*-

importwx_sendMessage

msg=wx_sendMessage.WeiXinSendMsgClass()

msg.send("目标用户账号", "这里是发送的消息")

大功告成,到此为止已经可以完成对某一个关注企业号的用户发送消息啦,可是还不够,群发是必须滴

群组发送的话就再扩展一下send方法 再加一个参数totag 就是标签id

可以在通讯录新建标签,然后把相应的人员加进去 见下图 右侧标签id就是需要传的参数值

修改如下,前后代码省略

defsend(self, to_user, totag,content):if to_user is not None and content is notNone:

self.data['touser'] =to_user

self.data['totag'] =totag

self.data['text']['content'] =contentelse:print

raise RuntimeError

好啦,这篇文章到这里就结束 了,如果有同学们正好也在做着跟我同样的事情并且看到我这篇文章的话,有不明白的话可以在文章下面留言,本人看到的话会及时回复的!!!

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。